-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
PublishKafka 2.0.0
- Bundle
- org.apache.nifi | nifi-kafka-nar
- Description
- Sends the contents of a FlowFile as either a message or as individual records to Apache Kafka using the Kafka Producer API. The messages to send may be individual FlowFiles, may be delimited using a user-specified delimiter (such as a new-line), or may be record-oriented data that can be read by the configured Record Reader. The complementary NiFi processor for fetching messages is ConsumeKafka.
- Tags
- Apache, Kafka, Message, PubSub, Put, Record, Send, avro, csv, json, logs
- Input Requirement
- REQUIRED
- Supports Sensitive Dynamic Properties
- false
Properties
-
Delivery Guarantee
Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka Client acks property.
- Display Name
- Delivery Guarantee
- Description
- Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka Client acks property.
- API Name
- acks
- Default Value
- all
- Allowable Values
-
- Guarantee Replicated Delivery
- Guarantee Single Node Delivery
- Best Effort
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Compression Type
Specifies the compression strategy for records sent to Kafka. Corresponds to Kafka Client compression.type property.
- Display Name
- Compression Type
- Description
- Specifies the compression strategy for records sent to Kafka. Corresponds to Kafka Client compression.type property.
- API Name
- compression.type
- Default Value
- none
- Allowable Values
-
- none
- gzip
- snappy
- lz4
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Failure Strategy
Specifies how the processor handles a FlowFile if it is unable to publish the data to Kafka
- Display Name
- Failure Strategy
- Description
- Specifies how the processor handles a FlowFile if it is unable to publish the data to Kafka
- API Name
- Failure Strategy
- Default Value
- Route to Failure
- Allowable Values
-
- Route to Failure
- Rollback
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
FlowFile Attribute Header Pattern
A Regular Expression that is matched against all FlowFile attribute names. Any attribute whose name matches the pattern will be added to the Kafka messages as a Header. If not specified, no FlowFile attributes will be added as headers.
- Display Name
- FlowFile Attribute Header Pattern
- Description
- A Regular Expression that is matched against all FlowFile attribute names. Any attribute whose name matches the pattern will be added to the Kafka messages as a Header. If not specified, no FlowFile attributes will be added as headers.
- API Name
- FlowFile Attribute Header Pattern
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
- Dependencies
-
- Publish Strategy is set to any of [USE_VALUE]
-
Header Encoding
For any attribute that is added as a Kafka Record Header, this property indicates the Character Encoding to use for serializing the headers.
- Display Name
- Header Encoding
- Description
- For any attribute that is added as a Kafka Record Header, this property indicates the Character Encoding to use for serializing the headers.
- API Name
- Header Encoding
- Default Value
- UTF-8
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- FlowFile Attribute Header Pattern is set to any value specified
-
Kafka Connection Service
Provides connections to Kafka Broker for publishing Kafka Records
- Display Name
- Kafka Connection Service
- Description
- Provides connections to Kafka Broker for publishing Kafka Records
- API Name
- Kafka Connection Service
- Service Interface
- org.apache.nifi.kafka.service.api.KafkaConnectionService
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Kafka Key
The Key to use for the Message. If not specified, the FlowFile attribute 'kafka.key' is used as the message key, if it is present.Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key.Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.
- Display Name
- Kafka Key
- Description
- The Key to use for the Message. If not specified, the FlowFile attribute 'kafka.key' is used as the message key, if it is present.Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key.Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.
- API Name
- Kafka Key
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Publish Strategy is set to any of [USE_WRAPPER]
-
Kafka Key Attribute Encoding
FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.
- Display Name
- Kafka Key Attribute Encoding
- Description
- FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.
- API Name
- Kafka Key Attribute Encoding
- Default Value
- utf-8
- Allowable Values
-
- UTF-8 Encoded
- Hex Encoded
- Do Not Add Key as Attribute
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Publish Strategy is set to any of [USE_WRAPPER]
-
Max Request Size
The maximum size of a request in bytes. Corresponds to Kafka Client max.request.size property.
- Display Name
- Max Request Size
- Description
- The maximum size of a request in bytes. Corresponds to Kafka Client max.request.size property.
- API Name
- max.request.size
- Default Value
- 1 MB
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Message Demarcator
Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.
- Display Name
- Message Demarcator
- Description
- Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.
- API Name
- Message Demarcator
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
-
Message Key Field
The name of a field in the Input Records that should be used as the Key for the Kafka message.
- Display Name
- Message Key Field
- Description
- The name of a field in the Input Records that should be used as the Key for the Kafka message.
- API Name
- Message Key Field
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Publish Strategy is set to any of [USE_VALUE]
-
Partition
Specifies the Kafka Partition destination for Records.
- Display Name
- Partition
- Description
- Specifies the Kafka Partition destination for Records.
- API Name
- partition
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
-
Partitioner Class
Specifies which class to use to compute a partition id for a message. Corresponds to Kafka Client partitioner.class property.
- Display Name
- Partitioner Class
- Description
- Specifies which class to use to compute a partition id for a message. Corresponds to Kafka Client partitioner.class property.
- API Name
- partitioner.class
- Default Value
- org.apache.kafka.clients.producer.internals.DefaultPartitioner
- Allowable Values
-
- RoundRobinPartitioner
- DefaultPartitioner
- Expression Language Partitioner
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Publish Strategy
The format used to publish the incoming FlowFile record to Kafka.
- Display Name
- Publish Strategy
- Description
- The format used to publish the incoming FlowFile record to Kafka.
- API Name
- Publish Strategy
- Default Value
- USE_VALUE
- Allowable Values
-
- Use Content as Record Value
- Use Wrapper
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Record Reader is set to any value specified
-
Record Key Writer
The Record Key Writer to use for outgoing FlowFiles
- Display Name
- Record Key Writer
- Description
- The Record Key Writer to use for outgoing FlowFiles
- API Name
- Record Key Writer
- Service Interface
- org.apache.nifi.serialization.RecordSetWriterFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
- Dependencies
-
- Publish Strategy is set to any of [USE_WRAPPER]
-
Record Metadata Strategy
Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured Topic Name and Partition / Partitioner class properties
- Display Name
- Record Metadata Strategy
- Description
- Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured Topic Name and Partition / Partitioner class properties
- API Name
- Record Metadata Strategy
- Default Value
- FROM_PROPERTIES
- Allowable Values
-
- Metadata From Record
- Use Configured Values
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Publish Strategy is set to any of [USE_WRAPPER]
-
Record Reader
The Record Reader to use for incoming FlowFiles
- Display Name
- Record Reader
- Description
- The Record Reader to use for incoming FlowFiles
- API Name
- Record Reader
- Service Interface
- org.apache.nifi.serialization.RecordReaderFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Record Writer
The Record Writer to use in order to serialize the data before sending to Kafka
- Display Name
- Record Writer
- Description
- The Record Writer to use in order to serialize the data before sending to Kafka
- API Name
- Record Writer
- Service Interface
- org.apache.nifi.serialization.RecordSetWriterFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Topic Name
Name of the Kafka Topic to which the Processor publishes Kafka Records
- Display Name
- Topic Name
- Description
- Name of the Kafka Topic to which the Processor publishes Kafka Records
- API Name
- Topic Name
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- true
-
Transactional ID Prefix
Specifies the KafkaProducer config transactional.id will be a generated UUID and will be prefixed with the configured string.
- Display Name
- Transactional ID Prefix
- Description
- Specifies the KafkaProducer config transactional.id will be a generated UUID and will be prefixed with the configured string.
- API Name
- Transactional ID Prefix
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
- Dependencies
-
- Transactions Enabled is set to any of [true]
-
Transactions Enabled
Specifies whether to provide transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true requires that the [Delivery Guarantee] property be set to [Guarantee Replicated Delivery.]
- Display Name
- Transactions Enabled
- Description
- Specifies whether to provide transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true requires that the [Delivery Guarantee] property be set to [Guarantee Replicated Delivery.]
- API Name
- Transactions Enabled
- Default Value
- true
- Allowable Values
-
- true
- false
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
Relationships
Name | Description |
---|---|
failure | Any FlowFile that cannot be sent to Kafka will be routed to this Relationship |
success | FlowFiles for which all content was sent to Kafka. |
Reads Attributes
Name | Description |
---|---|
kafka.tombstone | If this attribute is set to 'true', if the processor is not configured with a demarcator and if the FlowFile's content is null, then a tombstone message with zero bytes will be sent to Kafka. |
Writes Attributes
Name | Description |
---|---|
msg.count | The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to FlowFiles that are routed to success. |
See Also