-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeBoxEnterpriseEvents
- ConsumeBoxEvents
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesis
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CreateBoxFileMetadataInstance
- CreateBoxMetadataTemplate
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteBoxFileMetadataInstance
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractStructuredBoxFileMetadata
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchBoxFileInfo
- FetchBoxFileMetadataInstance
- FetchBoxFileRepresentation
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetBoxFileCollaborators
- GetBoxGroupMembers
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFileResource
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetS3ObjectTags
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListBoxFileInfo
- ListBoxFileMetadataInstances
- ListBoxFileMetadataTemplates
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutIcebergRecord
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateBoxFileMetadataInstance
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateGauge
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- ADLSIcebergFileIOProvider
- AmazonGlueEncodedSchemaReferenceReader
- AmazonGlueSchemaRegistry
- AmazonMSKConnectionService
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AwsRdsIamDatabasePasswordProvider
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentProtobufMessageNameResolver
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DeveloperBoxClientService
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GCSIcebergFileIOProvider
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- JWTBearerOAuth2AccessTokenProvider
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- ParquetIcebergWriter
- PEMEncodedSSLContextProvider
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RESTIcebergCatalog
- RestLookupService
- S3FileResourceService
- S3IcebergFileIOProvider
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAzureCredentialsControllerService
- StandardAzureIdentityFederationTokenProvider
- StandardDatabaseDialectService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProtobufReader
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
ConsumeKinesis 2.9.0
- Bundle
- org.apache.nifi | nifi-aws-kinesis-nar
- Description
- Consumes records from an Amazon Kinesis Data Stream. Uses DynamoDB-based checkpointing for reliable resumption after restarts. Note: when a shard is split or multiple shards are merged, this processor will consume from child and parent shards concurrently. It does not wait for parent shards to be fully consumed before reading child shards, so record ordering is not guaranteed across a split or merge boundary.
- Tags
- amazon, aws, consume, kinesis, record, stream
- Input Requirement
- FORBIDDEN
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for ConsumeKinesis 2.9.0
ConsumeKinesis
IAM permissions required for ConsumeKinesis
You must add the following permissions to the IAM role or user configured in AWS Credentials Provider Service.
Cloudwatch permissions are needed only when Metrics Publishing is set to CloudWatch.
Service Actions Resources (ARNs) Purpose Amazon Kinesis Data Streams DescribeStream
DescribeStreamSummary
RegisterStreamConsumerKinesis data stream from which ConsumeKinesiswill process the data.arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}Before attempting to read records, the consumer checks if the data stream exists, if it’s active, and if the shards are contained in the data stream. Registers consumers to a shard. Amazon Kinesis Data Streams GetRecords
GetShardIterator
ListShardsKinesis data stream from which ConsumeKinesiswill process the data.arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}Reads records from a shard. Amazon Kinesis Data Streams SubscribeToShard
DescribeStreamConsumerKinesis data stream from which ConsumeKinesiswill process the data. Add this action only if you use enhanced fan-out (EFO) consumers.arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}/consumer/*Subscribes to a shard for enhanced fan-out (EFO) consumers. Amazon DynamoDB CreateTable
DescribeTable
UpdateTable
Scan
GetItem
PutItem
UpdateItem
DeleteItemLease table (metadata table in DynamoDB created by ConsumeKinesis.arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}These actions are required for ConsumeKinesisto manage the lease table created in DynamoDB.Amazon DynamoDB CreateTable
DescribeTable
Scan
GetItem
PutItem
UpdateItem
DeleteItemWorker metrics and coordinator state table (metadata tables in DynamoDB) created by ConsumeKinesis.arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-WorkerMetricStatsarn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-CoordinatorStateThese actions are required for ConsumeKinesisto manage the worker metrics and coordinator state metadata tables in DynamoDB.Amazon DynamoDB Query Global secondary index on the lease table. arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}/index/*This action is required for ConsumeKinesisto read the global secondary index of the lease table created in DynamoDB.Amazon CloudWatch PutMetricData *Upload metrics to CloudWatch that are useful for monitoring the application. The asterisk (*) is used because there is no specific resource in CloudWatch on which the PutMetricData action is invoked. The following is an example policy document for
ConsumeKinesis.{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": [ "arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}" ] }, { "Effect": "Allow", "Action": [ "kinesis:SubscribeToShard", "kinesis:DescribeStreamConsumer" ], "Resource": [ "arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}/consumer/*" ] }, { "Effect": "Allow", "Action": [ "dynamodb:CreateTable", "dynamodb:DescribeTable", "dynamodb:UpdateTable", "dynamodb:GetItem", "dynamodb:UpdateItem", "dynamodb:PutItem", "dynamodb:DeleteItem", "dynamodb:Scan" ], "Resource": [ "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}" ] }, { "Effect": "Allow", "Action": [ "dynamodb:CreateTable", "dynamodb:DescribeTable", "dynamodb:GetItem", "dynamodb:UpdateItem", "dynamodb:PutItem", "dynamodb:DeleteItem", "dynamodb:Scan" ], "Resource": [ "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-WorkerMetricStats", "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-CoordinatorState" ] }, { "Effect": "Allow", "Action": [ "dynamodb:Query" ], "Resource": [ "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}/index/*" ] }, { "Effect": "Allow", "Action": [ "cloudwatch:PutMetricData" ], "Resource": "*" } ] }Note: Replace “{Region}”, “{Account}”, “{Stream Name}”, and “{Application Name}” in the ARNs with your own AWS region, AWS account number, Kinesis data stream name, and
ConsumeKinesisApplication Name property respectively.Consumer Type
Comparison of different Consumer Types from Amazon Kinesis Streams documentation:
Characteristics Shared throughput consumers without enhanced fan-out Enhanced fan-out consumers Read throughput Fixed at a total of 2 MB/sec per shard. If there are multiple consumers reading from the same shard, they all share this throughput. The sum of the throughputs they receive from the shard doesn’t exceed 2 MB/sec. Scales as consumers register to use enhanced fan-out. Each consumer registered to use enhanced fan-out receives its own read throughput per shard, up to 2 MB/sec, independently of other consumers. Message propagation delay An average of around 200 ms if you have one consumer reading from the stream. This average goes up to around 1000 ms if you have five consumers. Typically an average of 70 ms whether you have one consumer or five consumers. Cost Not applicable There is a data retrieval cost and a consumer-shard hour cost. For more information, see Amazon Kinesis Data Streams Pricing. Record processing
When Processing Strategy property is set to RECORD, ConsumeKinesis operates in Record mode. In this mode, the processor reads records from Kinesis streams using the configured Record Reader, and writes them to FlowFiles using the configured Record Writer.
The processor tries to optimize the number of FlowFiles created by batching multiple records with the same schema into a single FlowFile.
Schema changes
ConsumeKinesis supports dynamically changing Kinesis record schemas. When a record with new schema is encountered, the currently open FlowFile is closed and a new FlowFile is created for the new schema. Thanks to this, the processor preserves record ordering for a single Shard, even with record schema changes.
Please note, the processor relies on Record Reader to provide correct schema for each record. Using a Reader with schema inference may produce a lot of different schemas, which may lead to excessive FlowFile creation. If performance is a concern, it is recommended to use a Reader with a predefined schema or schema registry.
Output Strategies
This processor offers multiple strategies configured via processor property Output Strategy for converting Kinesis records into FlowFiles.
- Use Content as Value (the default) writes only a Kinesis record value to a FlowFile.
- Use Wrapper writes a Kinesis Record value as well as metadata into separate fields of a FlowFile record.
- Inject Metadata writes a Kinesis Record value to a FlowFile record and adds a sub-record to it with metadata.
The written metadata includes the following fields:
- stream: The name of the Kinesis stream the record was received from.
- shardId: The identifier of the shard the record was received from.
- sequenceNumber: The sequence number of the record.
- subSequenceNumber: The subsequence number of the record, used when multiple smaller records are aggregated into a single Kinesis record. If a record was not part of a batch, this value will be 0.
- shardedSequenceNumber: A combination of the sequence number and subsequence number. This can be used to uniquely identify a record within a shard.
- partitionKey: The partition key of the record.
- approximateArrival: The approximate arrival timestamp of the record (in milliseconds since epoch).
The record schema that is used when Use Wrapper is selected is as follows (in Avro format):
{ "type": "record", "name": "nifiRecord", "namespace": "org.apache.nifi", "fields": [ { "name": "value", "type": [ { < Fields as determined by the Record Reader for a Kinesis message > }, "null" ] }, { "name": "kinesisMetadata", "type": [ { "type": "record", "name": "metadataType", "fields": [ { "name": "stream", "type": ["string", "null"] }, { "name": "shardId", "type": ["string", "null"] }, { "name": "sequenceNumber", "type": ["string", "null"] }, { "name": "subSequenceNumber", "type": ["long", "null"] }, { "name": "shardedSequenceNumber", "type": ["string", "null"] }, { "name": "partitionKey", "type": ["string", "null"] }, { "name": "approximateArrival", "type": [ { "type": "long", "logicalType": "timestamp-millis" }, "null" ] } ] }, "null" ] } ] }The record schema that is used when “Inject Metadata” is selected is as follows (in Avro format):
{ "type": "record", "name": "nifiRecord", "namespace": "org.apache.nifi", "fields": [ < Fields as determined by the Record Reader for a Kinesis message >, { "name": "kinesisMetadata", "type": [ { "type": "record", "name": "metadataType", "fields": [ { "name": "stream", "type": ["string", "null"] }, { "name": "shardId", "type": ["string", "null"] }, { "name": "sequenceNumber", "type": ["string", "null"] }, { "name": "subSequenceNumber", "type": ["long", "null"] }, { "name": "shardedSequenceNumber", "type": ["string", "null"] }, { "name": "partitionKey", "type": ["string", "null"] }, { "name": "approximateArrival", "type": [ { "type": "long", "logicalType": "timestamp-millis" }, "null" ] } ] }, "null" ] } ] }Here is an example of FlowFile content that is emitted by JsonRecordSetWriter when strategy Use Wrapper is selected:
[ { "value": { "address": "1234 First Street", "zip": "12345", "account": { "name": "Acme", "number": "AC1234" } }, "kinesisMetadata" : { "stream" : "stream-name", "shardId" : "shardId-000000000000", "sequenceNumber" : "123456789", "subSequenceNumber" : 3, "shardedSequenceNumber" : "12345678900000000000000000003", "partitionKey" : "123", "approximateArrival" : 1756459596788 } } ]Here is an example of FlowFile content that is emitted by JsonRecordSetWriter when strategy Inject Metadata is selected:
[ { "address": "1234 First Street", "zip": "12345", "account": { "name": "Acme", "number": "AC1234" }, "kinesisMetadata" : { "stream" : "stream-name", "shardId" : "shardId-000000000000", "sequenceNumber" : "123456789", "subSequenceNumber" : 3, "shardedSequenceNumber" : "12345678900000000000000000003", "partitionKey" : "123", "approximateArrival" : 1756459596788 } } ]
-
Application Name
The name of the Kinesis application. Used as the DynamoDB table name for checkpoint storage. When Consumer Type is Enhanced Fan-Out, this value is also used as the registered consumer name. This value should be unique for the stream.
- Display Name
- Application Name
- Description
- The name of the Kinesis application. Used as the DynamoDB table name for checkpoint storage. When Consumer Type is Enhanced Fan-Out, this value is also used as the registered consumer name. This value should be unique for the stream.
- API Name
- Application Name
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
AWS Credentials Provider Service
The Controller Service used to obtain AWS credentials provider.
- Display Name
- AWS Credentials Provider Service
- Description
- The Controller Service used to obtain AWS credentials provider.
- API Name
- AWS Credentials Provider Service
- Service Interface
- org.apache.nifi.processors.aws.credentials.provider.AwsCredentialsProviderService
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Consumer Type
Strategy for reading records from Amazon Kinesis streams.
- Display Name
- Consumer Type
- Description
- Strategy for reading records from Amazon Kinesis streams.
- API Name
- Consumer Type
- Default Value
- ENHANCED_FAN_OUT
- Allowable Values
-
- Shared Throughput
- Enhanced Fan-Out
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Custom Region
Custom region, e.g. a region of an AWS compatible service provider
- Display Name
- Custom Region
- Description
- Custom region, e.g. a region of an AWS compatible service provider
- API Name
- Custom Region
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- true
- Dependencies
-
- Region is set to any of [use-custom-region]
-
Endpoint Override URL
An optional endpoint override URL for both the Kinesis and DynamoDB clients.
- Display Name
- Endpoint Override URL
- Description
- An optional endpoint override URL for both the Kinesis and DynamoDB clients.
- API Name
- Endpoint Override URL
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Initial Stream Position
The position in the stream where the processor should start reading.
- Display Name
- Initial Stream Position
- Description
- The position in the stream where the processor should start reading.
- API Name
- Initial Stream Position
- Default Value
- TRIM_HORIZON
- Allowable Values
-
- Trim Horizon
- Latest
- At Timestamp
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Max Batch Duration
The maximum amount of time to spend consuming records in a single invocation before committing the session and checkpointing.
- Display Name
- Max Batch Duration
- Description
- The maximum amount of time to spend consuming records in a single invocation before committing the session and checkpointing.
- API Name
- Max Batch Duration
- Default Value
- 5 sec
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Max Batch Size
The maximum amount of data to consume in a single invocation before committing the session and checkpointing.
- Display Name
- Max Batch Size
- Description
- The maximum amount of data to consume in a single invocation before committing the session and checkpointing.
- API Name
- Max Batch Size
- Default Value
- 10 MB
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Max Records Per Request
The maximum number of records to retrieve per GetRecords call. Maximum is 10,000.
- Display Name
- Max Records Per Request
- Description
- The maximum number of records to retrieve per GetRecords call. Maximum is 10,000.
- API Name
- Max Records Per Request
- Default Value
- 100
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Consumer Type is set to any of [SHARED_THROUGHPUT]
-
Message Demarcator
Specifies the string (interpreted as UTF-8) used to separate multiple Kinesis messages within a single FlowFile when Processing Strategy is DEMARCATOR.
- Display Name
- Message Demarcator
- Description
- Specifies the string (interpreted as UTF-8) used to separate multiple Kinesis messages within a single FlowFile when Processing Strategy is DEMARCATOR.
- API Name
- Message Demarcator
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Processing Strategy is set to any of [DEMARCATOR]
-
Output Strategy
The format used to output a Kinesis Record into a FlowFile Record.
- Display Name
- Output Strategy
- Description
- The format used to output a Kinesis Record into a FlowFile Record.
- API Name
- Output Strategy
- Default Value
- USE_VALUE
- Allowable Values
-
- Use Content as Value
- Use Wrapper
- Inject Metadata
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Processing Strategy is set to any of [RECORD]
-
Processing Strategy
Strategy for processing Kinesis Records and writing serialized output to FlowFiles.
- Display Name
- Processing Strategy
- Description
- Strategy for processing Kinesis Records and writing serialized output to FlowFiles.
- API Name
- Processing Strategy
- Default Value
- FLOW_FILE
- Allowable Values
-
- FLOW_FILE
- LINE_DELIMITED
- RECORD
- DEMARCATOR
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Proxy Configuration Service
Specifies the Proxy Configuration Controller Service to proxy network requests.
- Display Name
- Proxy Configuration Service
- Description
- Specifies the Proxy Configuration Controller Service to proxy network requests.
- API Name
- Proxy Configuration Service
- Service Interface
- org.apache.nifi.proxy.ProxyConfigurationService
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Record Reader
The Record Reader to use for parsing the data received from Kinesis.
- Display Name
- Record Reader
- Description
- The Record Reader to use for parsing the data received from Kinesis.
- API Name
- Record Reader
- Service Interface
- org.apache.nifi.serialization.RecordReaderFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Processing Strategy is set to any of [RECORD]
-
Record Writer
The Record Writer to use for serializing records.
- Display Name
- Record Writer
- Description
- The Record Writer to use for serializing records.
- API Name
- Record Writer
- Service Interface
- org.apache.nifi.serialization.RecordSetWriterFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Processing Strategy is set to any of [RECORD]
-
Region
AWS Region in which the service is located
- Display Name
- Region
- Description
- AWS Region in which the service is located
- API Name
- Region
- Default Value
- us-west-2
- Allowable Values
-
- Africa (Cape Town)
- Asia Pacific (Hong Kong)
- Asia Pacific (Hyderabad)
- Asia Pacific (Jakarta)
- Asia Pacific (Malaysia)
- Asia Pacific (Melbourne)
- Asia Pacific (Mumbai)
- Asia Pacific (New Zealand)
- Asia Pacific (Osaka)
- Asia Pacific (Seoul)
- Asia Pacific (Singapore)
- Asia Pacific (Sydney)
- Asia Pacific (Taipei)
- Asia Pacific (Thailand)
- Asia Pacific (Tokyo)
- AWS European Sovereign Cloud (Germany)
- aws global region
- AWS GovCloud (US-East)
- AWS GovCloud (US-West)
- aws-cn global region
- aws-iso global region
- aws-iso-b global region
- aws-iso-e global region
- aws-iso-f global region
- aws-us-gov global region
- Canada (Central)
- Canada West (Calgary)
- China (Beijing)
- China (Ningxia)
- EU ISOE West
- Europe (Frankfurt)
- Europe (Ireland)
- Europe (London)
- Europe (Milan)
- Europe (Paris)
- Europe (Spain)
- Europe (Stockholm)
- Europe (Zurich)
- Israel (Tel Aviv)
- Mexico (Central)
- Middle East (Bahrain)
- Middle East (UAE)
- South America (Sao Paulo)
- US East (N. Virginia)
- US East (Ohio)
- US ISO East
- US ISO WEST
- US ISOB East (Ohio)
- US ISOB West
- US ISOF EAST
- US ISOF SOUTH
- US West (N. California)
- US West (Oregon)
- Use Custom Region
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Stream Name
The name of the Kinesis stream to consume from.
- Display Name
- Stream Name
- Description
- The name of the Kinesis stream to consume from.
- API Name
- Stream Name
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Stream Position Timestamp
Timestamp position in stream from which to start reading Kinesis Records. Must be in ISO 8601 format.
- Display Name
- Stream Position Timestamp
- Description
- Timestamp position in stream from which to start reading Kinesis Records. Must be in ISO 8601 format.
- API Name
- Stream Position Timestamp
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Initial Stream Position is set to any of [AT_TIMESTAMP]
| Resource | Description |
|---|---|
| CPU | The processor uses additional CPU resources when consuming data from Kinesis. |
| NETWORK | The processor will continually poll for new Records. |
| MEMORY | Records are fetched from Kinesis in the background and buffered in memory until they can be written to FlowFiles. Up to 200 fetch responses may be buffered per shard for both Shared Throughput and Enhanced Fan-Out. Each Shared Throughput response can contain up to the value of 'Max Records Per Request' records (default 100) and up to 10 MB, so the theoretical maximum is 20,000 records or approximately 2 GB per shard at default settings. Each Enhanced Fan-Out push event can hold up to roughly 2 MB, for a theoretical maximum of approximately 400 MB per shard. In practice the buffer is typically much smaller because fetch threads block when the queue is full and most responses are well below the maximum size. |
| Name | Description |
|---|---|
| success | FlowFiles that are created when records are successfully read from Kinesis and parsed |
| Name | Description |
|---|---|
| aws.kinesis.stream.name | The name of the Kinesis Stream from which records were read |
| aws.kinesis.shard.id | Shard ID from which records were read |
| aws.kinesis.partition.key | Partition key of the last Kinesis Record in the FlowFile |
| aws.kinesis.first.sequence.number | Sequence Number of the first Kinesis Record in the FlowFile |
| aws.kinesis.first.subsequence.number | Sub-Sequence Number of the first Kinesis Record in the FlowFile |
| aws.kinesis.last.sequence.number | Sequence Number of the last Kinesis Record in the FlowFile |
| aws.kinesis.last.subsequence.number | Sub-Sequence Number of the last Kinesis Record in the FlowFile |
| aws.kinesis.approximate.arrival.timestamp.ms | Approximate arrival timestamp associated with the Kinesis Record or records in the FlowFile |
| mime.type | Sets the mime.type attribute to the MIME Type specified by the Record Writer (if configured) |
| record.count | Number of records written to the FlowFile |
| record.error.message | Error message encountered by the Record Reader or Record Writer (if configured) |
| kinesis.millis.behind | How far behind the stream tail we are, in milliseconds |