ConsumeKinesis 2.7.0

Bundle
org.apache.nifi | nifi-aws-kinesis-nar
Description
Consumes data from the specified AWS Kinesis stream and outputs a FlowFile for every processed Record (raw) or a FlowFile for a batch of processed records if a Record Reader and Record Writer are configured. The processor may take a few minutes on the first start and several seconds on subsequent starts to initialize before starting to fetch data. Uses DynamoDB for check pointing and coordination, and (optional) CloudWatch for metrics.
Tags
amazon, aws, consume, kinesis, record, stream
Input Requirement
FORBIDDEN
Supports Sensitive Dynamic Properties
false
  • Additional Details for ConsumeKinesis 2.7.0

    ConsumeKinesis

    IAM permissions required for ConsumeKinesis

    You must add the following permissions to the IAM role or user configured in AWS Credentials Provider Service.

    Cloudwatch permissions are needed only when Metrics Publishing is set to CloudWatch.

    Service Actions Resources (ARNs) Purpose
    Amazon Kinesis Data Streams DescribeStream
    DescribeStreamSummary
    RegisterStreamConsumer
    Kinesis data stream from which ConsumeKinesis will process the data.
    arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}
    Before attempting to read records, the consumer checks if the data stream exists, if it’s active, and if the shards are contained in the data stream. Registers consumers to a shard.
    Amazon Kinesis Data Streams GetRecords
    GetShardIterator
    ListShards
    Kinesis data stream from which ConsumeKinesis will process the data.
    arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}
    Reads records from a shard.
    Amazon Kinesis Data Streams SubscribeToShard
    DescribeStreamConsumer
    Kinesis data stream from which ConsumeKinesis will process the data. Add this action only if you use enhanced fan-out (EFO) consumers.
    arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}/consumer/*
    Subscribes to a shard for enhanced fan-out (EFO) consumers.
    Amazon DynamoDB CreateTable
    DescribeTable
    UpdateTable
    Scan
    GetItem
    PutItem
    UpdateItem
    DeleteItem
    Lease table (metadata table in DynamoDB created by ConsumeKinesis.
    arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}
    These actions are required for ConsumeKinesis to manage the lease table created in DynamoDB.
    Amazon DynamoDB CreateTable
    DescribeTable
    Scan
    GetItem
    PutItem
    UpdateItem
    DeleteItem
    Worker metrics and coordinator state table (metadata tables in DynamoDB) created by ConsumeKinesis.
    arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-WorkerMetricStats
    arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-CoordinatorState
    These actions are required for ConsumeKinesis to manage the worker metrics and coordinator state metadata tables in DynamoDB.
    Amazon DynamoDB Query Global secondary index on the lease table.
    arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}/index/*
    This action is required for ConsumeKinesis to read the global secondary index of the lease table created in DynamoDB.
    Amazon CloudWatch PutMetricData * Upload metrics to CloudWatch that are useful for monitoring the application. The asterisk (*) is used because there is no specific resource in CloudWatch on which the PutMetricData action is invoked.

    The following is an example policy document for ConsumeKinesis.

    {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "kinesis:DescribeStream",
                    "kinesis:DescribeStreamSummary",
                    "kinesis:RegisterStreamConsumer",
                    "kinesis:GetRecords",
                    "kinesis:GetShardIterator",
                    "kinesis:ListShards"
                ],
                "Resource": [
                    "arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "kinesis:SubscribeToShard",
                    "kinesis:DescribeStreamConsumer"
                ],
                "Resource": [
                    "arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}/consumer/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "dynamodb:CreateTable",
                    "dynamodb:DescribeTable",
                    "dynamodb:UpdateTable",
                    "dynamodb:GetItem",
                    "dynamodb:UpdateItem",
                    "dynamodb:PutItem",
                    "dynamodb:DeleteItem",
                    "dynamodb:Scan"
                ],
                "Resource": [
                    "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "dynamodb:CreateTable",
                    "dynamodb:DescribeTable",
                    "dynamodb:GetItem",
                    "dynamodb:UpdateItem",
                    "dynamodb:PutItem",
                    "dynamodb:DeleteItem",
                    "dynamodb:Scan"
                ],
                "Resource": [
                    "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-WorkerMetricStats",
                    "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-CoordinatorState"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "dynamodb:Query"
                ],
                "Resource": [
                    "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}/index/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "cloudwatch:PutMetricData"
                ],
                "Resource": "*"
            }
        ]
    }
    

    Note: Replace “{Region}”, “{Account}”, “{Stream Name}”, and “{Application Name}” in the ARNs with your own AWS region, AWS account number, Kinesis data stream name, and ConsumeKinesis Application Name property respectively.

    Consumer Type

    Comparison of different Consumer Types from Amazon Kinesis Streams documentation:

    Characteristics Shared throughput consumers without enhanced fan-out Enhanced fan-out consumers
    Read throughput Fixed at a total of 2 MB/sec per shard. If there are multiple consumers reading from the same shard, they all share this throughput. The sum of the throughputs they receive from the shard doesn’t exceed 2 MB/sec. Scales as consumers register to use enhanced fan-out. Each consumer registered to use enhanced fan-out receives its own read throughput per shard, up to 2 MB/sec, independently of other consumers.
    Message propagation delay An average of around 200 ms if you have one consumer reading from the stream. This average goes up to around 1000 ms if you have five consumers. Typically an average of 70 ms whether you have one consumer or five consumers.
    Cost Not applicable There is a data retrieval cost and a consumer-shard hour cost. For more information, see Amazon Kinesis Data Streams Pricing.

    Record processing

    When Processing Strategy property is set to RECORD, ConsumeKinesis operates in Record mode. In this mode, the processor reads records from Kinesis streams using the configured Record Reader, and writes them to FlowFiles using the configured Record Writer.

    The processor tries to optimize the number of FlowFiles created by batching multiple records with the same schema into a single FlowFile.

    Schema changes

    ConsumeKinesis supports dynamically changing Kinesis record schemas. When a record with new schema is encountered, the currently open FlowFile is closed and a new FlowFile is created for the new schema. Thanks to this, the processor preserves record ordering for a single Shard, even with record schema changes.

    Please note, the processor relies on Record Reader to provide correct schema for each record. Using a Reader with schema inference may produce a lot of different schemas, which may lead to excessive FlowFile creation. If performance is a concern, it is recommended to use a Reader with a predefined schema or schema registry.

    Output Strategies

    This processor offers multiple strategies configured via processor property Output Strategy for converting Kinesis records into FlowFiles.

    • Use Content as Value (the default) writes only a Kinesis record value to a FlowFile.
    • Use Wrapper writes a Kinesis Record value as well as metadata into separate fields of a FlowFile record.
    • Inject Metadata writes a Kinesis Record value to a FlowFile record and adds a sub-record to it with metadata.

    The written metadata includes the following fields:

    • stream: The name of the Kinesis stream the record was received from.
    • shardId: The identifier of the shard the record was received from.
    • sequenceNumber: The sequence number of the record.
    • subSequenceNumber: The subsequence number of the record, used when multiple smaller records are aggregated into a single Kinesis record. If a record was not part of a batch, this value will be 0.
    • shardedSequenceNumber: A combination of the sequence number and subsequence number. This can be used to uniquely identify a record within a shard.
    • partitionKey: The partition key of the record.
    • approximateArrival: The approximate arrival timestamp of the record (in milliseconds since epoch).

    The record schema that is used when Use Wrapper is selected is as follows (in Avro format):

    {
      "type": "record",
      "name": "nifiRecord",
      "namespace": "org.apache.nifi",
      "fields": [
        {
          "name": "value",
          "type": [
            {
              < Fields as determined by the Record Reader for a Kinesis message >
            },
            "null"
          ]
        },
        {
          "name": "kinesisMetadata",
          "type": [
            {
              "type": "record",
              "name": "metadataType",
              "fields": [
                { "name": "stream", "type": ["string", "null"] },
                { "name": "shardId", "type": ["string", "null"] },
                { "name": "sequenceNumber", "type": ["string", "null"] },
                { "name": "subSequenceNumber", "type": ["long", "null"] },
                { "name": "shardedSequenceNumber", "type": ["string", "null"] },
                { "name": "partitionKey", "type": ["string", "null"] },
                { "name": "approximateArrival", "type": [ { "type": "long", "logicalType": "timestamp-millis" }, "null" ] }
              ]
            },
            "null"
          ]
        }
      ]
    }
    

    The record schema that is used when “Inject Metadata” is selected is as follows (in Avro format):

    {
      "type": "record",
      "name": "nifiRecord",
      "namespace": "org.apache.nifi",
      "fields": [
        < Fields as determined by the Record Reader for a Kinesis message >,
        {
          "name": "kinesisMetadata",
          "type": [
            {
              "type": "record",
              "name": "metadataType",
              "fields": [
                { "name": "stream", "type": ["string", "null"] },
                { "name": "shardId", "type": ["string", "null"] },
                { "name": "sequenceNumber", "type": ["string", "null"] },
                { "name": "subSequenceNumber", "type": ["long", "null"] },
                { "name": "shardedSequenceNumber", "type": ["string", "null"] },
                { "name": "partitionKey", "type": ["string", "null"] },
                { "name": "approximateArrival", "type": [ { "type": "long", "logicalType": "timestamp-millis" }, "null" ] }
              ]
            },
            "null"
          ]
        }
      ]
    }
    

    Here is an example of FlowFile content that is emitted by JsonRecordSetWriter when strategy Use Wrapper is selected:

    [
      {
        "value": {
          "address": "1234 First Street",
          "zip": "12345",
          "account": {
            "name": "Acme",
            "number": "AC1234"
          }
        },
        "kinesisMetadata" : {
          "stream" : "stream-name",
          "shardId" : "shardId-000000000000",
          "sequenceNumber" : "123456789",
          "subSequenceNumber" : 3,
          "shardedSequenceNumber" : "12345678900000000000000000003",
          "partitionKey" : "123",
          "approximateArrival" : 1756459596788
        }
      }
    ]
    

    Here is an example of FlowFile content that is emitted by JsonRecordSetWriter when strategy Inject Metadata is selected:

    [
      {
        "address": "1234 First Street",
        "zip": "12345",
        "account": {
          "name": "Acme",
          "number": "AC1234"
        },
        "kinesisMetadata" : {
          "stream" : "stream-name",
          "shardId" : "shardId-000000000000",
          "sequenceNumber" : "123456789",
          "subSequenceNumber" : 3,
          "shardedSequenceNumber" : "12345678900000000000000000003",
          "partitionKey" : "123",
          "approximateArrival" : 1756459596788
        }
      }
    ]
    
Properties
System Resource Considerations
Resource Description
CPU The processor uses additional CPU resources when consuming data from Kinesis. The consumption is started immediately after this Processor is scheduled. The consumption ends only when the Processor is stopped.
NETWORK The processor will continually poll for new Records, requesting up to a maximum number of Records/bytes per call. This can result in sustained network usage.
MEMORY ConsumeKinesis buffers Kinesis Records in memory until they can be processed. The maximum size of the buffer is controlled by the 'Max Bytes to Buffer' property. In addition, the processor may cache some amount of data for each shard when the processor's buffer is full.
Relationships
Name Description
success FlowFiles that are created when records are successfully read from Kinesis and parsed
Writes Attributes
Name Description
aws.kinesis.stream.name The name of the Kinesis Stream from which all Kinesis Records in the Flow File were read
aws.kinesis.shard.id Shard ID from which all Kinesis Records in the Flow File were read
aws.kinesis.partition.key Partition key of the last Kinesis Record in the Flow File
aws.kinesis.first.sequence.number A Sequence Number of the first Kinesis Record in the Flow File
aws.kinesis.first.subsequence.number A SubSequence Number of the first Kinesis Record in the Flow File. Generated by KPL when aggregating records into a single Kinesis Record
aws.kinesis.last.sequence.number A Sequence Number of the last Kinesis Record in the Flow File
aws.kinesis.last.subsequence.number A SubSequence Number of the last Kinesis Record in the Flow File. Generated by KPL when aggregating records into a single Kinesis Record
aws.kinesis.approximate.arrival.timestamp.ms Approximate arrival timestamp of the last Kinesis Record in the Flow File
mime.type Sets the mime.type attribute to the MIME Type specified by the Record Writer (if configured)
record.count Number of records written to the FlowFiles by the Record Writer (if configured)
record.error.message This attribute provides on failure the error message encountered by the Record Reader or Record Writer (if configured)