-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeBoxEnterpriseEvents
- ConsumeBoxEvents
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesis
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CreateBoxFileMetadataInstance
- CreateBoxMetadataTemplate
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteBoxFileMetadataInstance
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractStructuredBoxFileMetadata
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchBoxFileInfo
- FetchBoxFileMetadataInstance
- FetchBoxFileRepresentation
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetBoxFileCollaborators
- GetBoxGroupMembers
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFileResource
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetS3ObjectTags
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListBoxFileInfo
- ListBoxFileMetadataInstances
- ListBoxFileMetadataTemplates
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutIcebergRecord
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateBoxFileMetadataInstance
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- ADLSIcebergFileIOProvider
- AmazonGlueEncodedSchemaReferenceReader
- AmazonGlueSchemaRegistry
- AmazonMSKConnectionService
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AwsRdsIamDatabasePasswordProvider
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentProtobufMessageNameResolver
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DeveloperBoxClientService
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- JWTBearerOAuth2AccessTokenProvider
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- ParquetIcebergWriter
- PEMEncodedSSLContextProvider
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RESTIcebergCatalog
- RestLookupService
- S3FileResourceService
- S3IcebergFileIOProvider
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDatabaseDialectService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProtobufReader
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
ConsumeKinesis 2.7.0
- Bundle
- org.apache.nifi | nifi-aws-kinesis-nar
- Description
- Consumes data from the specified AWS Kinesis stream and outputs a FlowFile for every processed Record (raw) or a FlowFile for a batch of processed records if a Record Reader and Record Writer are configured. The processor may take a few minutes on the first start and several seconds on subsequent starts to initialize before starting to fetch data. Uses DynamoDB for check pointing and coordination, and (optional) CloudWatch for metrics.
- Tags
- amazon, aws, consume, kinesis, record, stream
- Input Requirement
- FORBIDDEN
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for ConsumeKinesis 2.7.0
ConsumeKinesis
IAM permissions required for ConsumeKinesis
You must add the following permissions to the IAM role or user configured in AWS Credentials Provider Service.
Cloudwatch permissions are needed only when Metrics Publishing is set to CloudWatch.
Service Actions Resources (ARNs) Purpose Amazon Kinesis Data Streams DescribeStream
DescribeStreamSummary
RegisterStreamConsumerKinesis data stream from which ConsumeKinesiswill process the data.arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}Before attempting to read records, the consumer checks if the data stream exists, if it’s active, and if the shards are contained in the data stream. Registers consumers to a shard. Amazon Kinesis Data Streams GetRecords
GetShardIterator
ListShardsKinesis data stream from which ConsumeKinesiswill process the data.arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}Reads records from a shard. Amazon Kinesis Data Streams SubscribeToShard
DescribeStreamConsumerKinesis data stream from which ConsumeKinesiswill process the data. Add this action only if you use enhanced fan-out (EFO) consumers.arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}/consumer/*Subscribes to a shard for enhanced fan-out (EFO) consumers. Amazon DynamoDB CreateTable
DescribeTable
UpdateTable
Scan
GetItem
PutItem
UpdateItem
DeleteItemLease table (metadata table in DynamoDB created by ConsumeKinesis.arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}These actions are required for ConsumeKinesisto manage the lease table created in DynamoDB.Amazon DynamoDB CreateTable
DescribeTable
Scan
GetItem
PutItem
UpdateItem
DeleteItemWorker metrics and coordinator state table (metadata tables in DynamoDB) created by ConsumeKinesis.arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-WorkerMetricStatsarn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-CoordinatorStateThese actions are required for ConsumeKinesisto manage the worker metrics and coordinator state metadata tables in DynamoDB.Amazon DynamoDB Query Global secondary index on the lease table. arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}/index/*This action is required for ConsumeKinesisto read the global secondary index of the lease table created in DynamoDB.Amazon CloudWatch PutMetricData *Upload metrics to CloudWatch that are useful for monitoring the application. The asterisk (*) is used because there is no specific resource in CloudWatch on which the PutMetricData action is invoked. The following is an example policy document for
ConsumeKinesis.{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards" ], "Resource": [ "arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}" ] }, { "Effect": "Allow", "Action": [ "kinesis:SubscribeToShard", "kinesis:DescribeStreamConsumer" ], "Resource": [ "arn:aws:kinesis:{Region}:{Account}:stream/{Stream Name}/consumer/*" ] }, { "Effect": "Allow", "Action": [ "dynamodb:CreateTable", "dynamodb:DescribeTable", "dynamodb:UpdateTable", "dynamodb:GetItem", "dynamodb:UpdateItem", "dynamodb:PutItem", "dynamodb:DeleteItem", "dynamodb:Scan" ], "Resource": [ "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}" ] }, { "Effect": "Allow", "Action": [ "dynamodb:CreateTable", "dynamodb:DescribeTable", "dynamodb:GetItem", "dynamodb:UpdateItem", "dynamodb:PutItem", "dynamodb:DeleteItem", "dynamodb:Scan" ], "Resource": [ "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-WorkerMetricStats", "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}-CoordinatorState" ] }, { "Effect": "Allow", "Action": [ "dynamodb:Query" ], "Resource": [ "arn:aws:dynamodb:{Region}:{Account}:table/{Application Name}/index/*" ] }, { "Effect": "Allow", "Action": [ "cloudwatch:PutMetricData" ], "Resource": "*" } ] }Note: Replace “{Region}”, “{Account}”, “{Stream Name}”, and “{Application Name}” in the ARNs with your own AWS region, AWS account number, Kinesis data stream name, and
ConsumeKinesisApplication Name property respectively.Consumer Type
Comparison of different Consumer Types from Amazon Kinesis Streams documentation:
Characteristics Shared throughput consumers without enhanced fan-out Enhanced fan-out consumers Read throughput Fixed at a total of 2 MB/sec per shard. If there are multiple consumers reading from the same shard, they all share this throughput. The sum of the throughputs they receive from the shard doesn’t exceed 2 MB/sec. Scales as consumers register to use enhanced fan-out. Each consumer registered to use enhanced fan-out receives its own read throughput per shard, up to 2 MB/sec, independently of other consumers. Message propagation delay An average of around 200 ms if you have one consumer reading from the stream. This average goes up to around 1000 ms if you have five consumers. Typically an average of 70 ms whether you have one consumer or five consumers. Cost Not applicable There is a data retrieval cost and a consumer-shard hour cost. For more information, see Amazon Kinesis Data Streams Pricing. Record processing
When Processing Strategy property is set to RECORD, ConsumeKinesis operates in Record mode. In this mode, the processor reads records from Kinesis streams using the configured Record Reader, and writes them to FlowFiles using the configured Record Writer.
The processor tries to optimize the number of FlowFiles created by batching multiple records with the same schema into a single FlowFile.
Schema changes
ConsumeKinesis supports dynamically changing Kinesis record schemas. When a record with new schema is encountered, the currently open FlowFile is closed and a new FlowFile is created for the new schema. Thanks to this, the processor preserves record ordering for a single Shard, even with record schema changes.
Please note, the processor relies on Record Reader to provide correct schema for each record. Using a Reader with schema inference may produce a lot of different schemas, which may lead to excessive FlowFile creation. If performance is a concern, it is recommended to use a Reader with a predefined schema or schema registry.
Output Strategies
This processor offers multiple strategies configured via processor property Output Strategy for converting Kinesis records into FlowFiles.
- Use Content as Value (the default) writes only a Kinesis record value to a FlowFile.
- Use Wrapper writes a Kinesis Record value as well as metadata into separate fields of a FlowFile record.
- Inject Metadata writes a Kinesis Record value to a FlowFile record and adds a sub-record to it with metadata.
The written metadata includes the following fields:
- stream: The name of the Kinesis stream the record was received from.
- shardId: The identifier of the shard the record was received from.
- sequenceNumber: The sequence number of the record.
- subSequenceNumber: The subsequence number of the record, used when multiple smaller records are aggregated into a single Kinesis record. If a record was not part of a batch, this value will be 0.
- shardedSequenceNumber: A combination of the sequence number and subsequence number. This can be used to uniquely identify a record within a shard.
- partitionKey: The partition key of the record.
- approximateArrival: The approximate arrival timestamp of the record (in milliseconds since epoch).
The record schema that is used when Use Wrapper is selected is as follows (in Avro format):
{ "type": "record", "name": "nifiRecord", "namespace": "org.apache.nifi", "fields": [ { "name": "value", "type": [ { < Fields as determined by the Record Reader for a Kinesis message > }, "null" ] }, { "name": "kinesisMetadata", "type": [ { "type": "record", "name": "metadataType", "fields": [ { "name": "stream", "type": ["string", "null"] }, { "name": "shardId", "type": ["string", "null"] }, { "name": "sequenceNumber", "type": ["string", "null"] }, { "name": "subSequenceNumber", "type": ["long", "null"] }, { "name": "shardedSequenceNumber", "type": ["string", "null"] }, { "name": "partitionKey", "type": ["string", "null"] }, { "name": "approximateArrival", "type": [ { "type": "long", "logicalType": "timestamp-millis" }, "null" ] } ] }, "null" ] } ] }The record schema that is used when “Inject Metadata” is selected is as follows (in Avro format):
{ "type": "record", "name": "nifiRecord", "namespace": "org.apache.nifi", "fields": [ < Fields as determined by the Record Reader for a Kinesis message >, { "name": "kinesisMetadata", "type": [ { "type": "record", "name": "metadataType", "fields": [ { "name": "stream", "type": ["string", "null"] }, { "name": "shardId", "type": ["string", "null"] }, { "name": "sequenceNumber", "type": ["string", "null"] }, { "name": "subSequenceNumber", "type": ["long", "null"] }, { "name": "shardedSequenceNumber", "type": ["string", "null"] }, { "name": "partitionKey", "type": ["string", "null"] }, { "name": "approximateArrival", "type": [ { "type": "long", "logicalType": "timestamp-millis" }, "null" ] } ] }, "null" ] } ] }Here is an example of FlowFile content that is emitted by JsonRecordSetWriter when strategy Use Wrapper is selected:
[ { "value": { "address": "1234 First Street", "zip": "12345", "account": { "name": "Acme", "number": "AC1234" } }, "kinesisMetadata" : { "stream" : "stream-name", "shardId" : "shardId-000000000000", "sequenceNumber" : "123456789", "subSequenceNumber" : 3, "shardedSequenceNumber" : "12345678900000000000000000003", "partitionKey" : "123", "approximateArrival" : 1756459596788 } } ]Here is an example of FlowFile content that is emitted by JsonRecordSetWriter when strategy Inject Metadata is selected:
[ { "address": "1234 First Street", "zip": "12345", "account": { "name": "Acme", "number": "AC1234" }, "kinesisMetadata" : { "stream" : "stream-name", "shardId" : "shardId-000000000000", "sequenceNumber" : "123456789", "subSequenceNumber" : 3, "shardedSequenceNumber" : "12345678900000000000000000003", "partitionKey" : "123", "approximateArrival" : 1756459596788 } } ]
-
Application Name
The name of the Kinesis application. This is used for DynamoDB table naming and worker coordination.
- Display Name
- Application Name
- Description
- The name of the Kinesis application. This is used for DynamoDB table naming and worker coordination.
- API Name
- Application Name
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
AWS Credentials Provider Service
The Controller Service that is used to obtain AWS credentials provider. Ensure that the credentials provided have access to Kinesis, DynamoDB and (optional) CloudWatch.
- Display Name
- AWS Credentials Provider Service
- Description
- The Controller Service that is used to obtain AWS credentials provider. Ensure that the credentials provided have access to Kinesis, DynamoDB and (optional) CloudWatch.
- API Name
- AWS Credentials Provider Service
- Service Interface
- org.apache.nifi.processors.aws.credentials.provider.AwsCredentialsProviderService
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Checkpoint Interval
Interval between checkpointing consumed Kinesis records. To checkpoint records each time the Processor is run, set this value to 0 seconds. More frequent checkpoint may reduce performance and increase DynamoDB costs, but less frequent checkpointing may result in duplicates when a Shard lease is lost or NiFi is restarted.
- Display Name
- Checkpoint Interval
- Description
- Interval between checkpointing consumed Kinesis records. To checkpoint records each time the Processor is run, set this value to 0 seconds. More frequent checkpoint may reduce performance and increase DynamoDB costs, but less frequent checkpointing may result in duplicates when a Shard lease is lost or NiFi is restarted.
- API Name
- Checkpoint Interval
- Default Value
- 5 sec
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Consumer Type
Strategy for reading records from Amazon Kinesis streams.
- Display Name
- Consumer Type
- Description
- Strategy for reading records from Amazon Kinesis streams.
- API Name
- Consumer Type
- Default Value
- ENHANCED_FAN_OUT
- Allowable Values
-
- Shared Throughput
- Enhanced Fan-Out
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Custom Region
Custom region, e.g. a region of an AWS compatible service provider
- Display Name
- Custom Region
- Description
- Custom region, e.g. a region of an AWS compatible service provider
- API Name
- Custom Region
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- true
- Dependencies
-
- Region is set to any of [use-custom-region]
-
Initial Stream Position
The position in the stream where the processor should start reading.
- Display Name
- Initial Stream Position
- Description
- The position in the stream where the processor should start reading.
- API Name
- Initial Stream Position
- Default Value
- TRIM_HORIZON
- Allowable Values
-
- Trim Horizon
- Latest
- At Timestamp
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Max Bytes to Buffer
The maximum size of Kinesis Records that can be buffered in memory before being processed by NiFi. If the buffer size exceeds the limit, the processor will stop consuming new records until free space is available. Using a larger value may increase the throughput, but will do so at the expense of using more memory.
- Display Name
- Max Bytes to Buffer
- Description
- The maximum size of Kinesis Records that can be buffered in memory before being processed by NiFi. If the buffer size exceeds the limit, the processor will stop consuming new records until free space is available. Using a larger value may increase the throughput, but will do so at the expense of using more memory.
- API Name
- Max Bytes to Buffer
- Default Value
- 100 MB
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Metrics Publishing
Specifies where Kinesis usage metrics are published to.
- Display Name
- Metrics Publishing
- Description
- Specifies where Kinesis usage metrics are published to.
- API Name
- Metrics Publishing
- Default Value
- DISABLED
- Allowable Values
-
- Disabled
- Logs
- CloudWatch
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Output Strategy
The format used to output a Kinesis Record into a FlowFile Record.
- Display Name
- Output Strategy
- Description
- The format used to output a Kinesis Record into a FlowFile Record.
- API Name
- Output Strategy
- Default Value
- USE_VALUE
- Allowable Values
-
- Use Content as Value
- Use Wrapper
- Inject Metadata
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Processing Strategy is set to any of [RECORD]
-
Processing Strategy
Strategy for processing Kinesis Records and writing serialized output to FlowFiles.
- Display Name
- Processing Strategy
- Description
- Strategy for processing Kinesis Records and writing serialized output to FlowFiles.
- API Name
- Processing Strategy
- Default Value
- FLOW_FILE
- Allowable Values
-
- FLOW_FILE
- RECORD
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Proxy Configuration Service
Specifies the Proxy Configuration Controller Service to proxy network requests.
- Display Name
- Proxy Configuration Service
- Description
- Specifies the Proxy Configuration Controller Service to proxy network requests.
- API Name
- Proxy Configuration Service
- Service Interface
- org.apache.nifi.proxy.ProxyConfigurationService
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Record Reader
The Record Reader to use for parsing the data received from Kinesis. The Record Reader is responsible for providing schemas for the records. If the schemas change frequently, it might hinder performance of the processor.
- Display Name
- Record Reader
- Description
- The Record Reader to use for parsing the data received from Kinesis. The Record Reader is responsible for providing schemas for the records. If the schemas change frequently, it might hinder performance of the processor.
- API Name
- Record Reader
- Service Interface
- org.apache.nifi.serialization.RecordReaderFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Processing Strategy is set to any of [RECORD]
-
Record Writer
The Record Writer to use for serializing records.
- Display Name
- Record Writer
- Description
- The Record Writer to use for serializing records.
- API Name
- Record Writer
- Service Interface
- org.apache.nifi.serialization.RecordSetWriterFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Processing Strategy is set to any of [RECORD]
-
Region
AWS Region in which the service is located
- Display Name
- Region
- Description
- AWS Region in which the service is located
- API Name
- Region
- Default Value
- us-west-2
- Allowable Values
-
- AWS GovCloud (US-East)
- AWS GovCloud (US-West)
- Africa (Cape Town)
- Asia Pacific (Hong Kong)
- Asia Pacific (Hyderabad)
- Asia Pacific (Jakarta)
- Asia Pacific (Malaysia)
- Asia Pacific (Melbourne)
- Asia Pacific (Mumbai)
- Asia Pacific (New Zealand)
- Asia Pacific (Osaka)
- Asia Pacific (Seoul)
- Asia Pacific (Singapore)
- Asia Pacific (Sydney)
- Asia Pacific (Taipei)
- Asia Pacific (Thailand)
- Asia Pacific (Tokyo)
- Canada (Central)
- Canada West (Calgary)
- China (Beijing)
- China (Ningxia)
- EU (Germany)
- EU ISOE West
- Europe (Frankfurt)
- Europe (Ireland)
- Europe (London)
- Europe (Milan)
- Europe (Paris)
- Europe (Spain)
- Europe (Stockholm)
- Europe (Zurich)
- Israel (Tel Aviv)
- Mexico (Central)
- Middle East (Bahrain)
- Middle East (UAE)
- South America (Sao Paulo)
- US East (N. Virginia)
- US East (Ohio)
- US ISO East
- US ISO WEST
- US ISOB East (Ohio)
- US ISOB West
- US ISOF EAST
- US ISOF SOUTH
- US West (N. California)
- US West (Oregon)
- aws global region
- aws-cn global region
- aws-iso global region
- aws-iso-b global region
- aws-iso-e global region
- aws-iso-f global region
- aws-us-gov global region
- Use Custom Region
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Stream Name
The name of the Kinesis stream to consume from.
- Display Name
- Stream Name
- Description
- The name of the Kinesis stream to consume from.
- API Name
- Stream Name
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Stream Position Timestamp
Timestamp position in stream from which to start reading Kinesis Records. The timestamp must be in ISO 8601 format.
- Display Name
- Stream Position Timestamp
- Description
- Timestamp position in stream from which to start reading Kinesis Records. The timestamp must be in ISO 8601 format.
- API Name
- Stream Position Timestamp
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Initial Stream Position is set to any of [AT_TIMESTAMP]
| Resource | Description |
|---|---|
| CPU | The processor uses additional CPU resources when consuming data from Kinesis. The consumption is started immediately after this Processor is scheduled. The consumption ends only when the Processor is stopped. |
| NETWORK | The processor will continually poll for new Records, requesting up to a maximum number of Records/bytes per call. This can result in sustained network usage. |
| MEMORY | ConsumeKinesis buffers Kinesis Records in memory until they can be processed. The maximum size of the buffer is controlled by the 'Max Bytes to Buffer' property. In addition, the processor may cache some amount of data for each shard when the processor's buffer is full. |
| Name | Description |
|---|---|
| success | FlowFiles that are created when records are successfully read from Kinesis and parsed |
| Name | Description |
|---|---|
| aws.kinesis.stream.name | The name of the Kinesis Stream from which all Kinesis Records in the Flow File were read |
| aws.kinesis.shard.id | Shard ID from which all Kinesis Records in the Flow File were read |
| aws.kinesis.partition.key | Partition key of the last Kinesis Record in the Flow File |
| aws.kinesis.first.sequence.number | A Sequence Number of the first Kinesis Record in the Flow File |
| aws.kinesis.first.subsequence.number | A SubSequence Number of the first Kinesis Record in the Flow File. Generated by KPL when aggregating records into a single Kinesis Record |
| aws.kinesis.last.sequence.number | A Sequence Number of the last Kinesis Record in the Flow File |
| aws.kinesis.last.subsequence.number | A SubSequence Number of the last Kinesis Record in the Flow File. Generated by KPL when aggregating records into a single Kinesis Record |
| aws.kinesis.approximate.arrival.timestamp.ms | Approximate arrival timestamp of the last Kinesis Record in the Flow File |
| mime.type | Sets the mime.type attribute to the MIME Type specified by the Record Writer (if configured) |
| record.count | Number of records written to the FlowFiles by the Record Writer (if configured) |
| record.error.message | This attribute provides on failure the error message encountered by the Record Reader or Record Writer (if configured) |