-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
JoinEnrichment 2.0.0
- Bundle
- org.apache.nifi | nifi-standard-nar
- Description
- Joins together Records from two different FlowFiles where one FlowFile, the 'original' contains arbitrary records and the second FlowFile, the 'enrichment' contains additional data that should be used to enrich the first. See Additional Details for more information on how to configure this processor and the different use cases that it aims to accomplish.
- Tags
- combine, enrichment, fork, join, merge, record, recordpath, sql, streams, wrap
- Input Requirement
- REQUIRED
- Supports Sensitive Dynamic Properties
- false
Properties
-
Default Decimal Precision
When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type, a specific 'precision' denoting number of available digits is required. Generally, precision is defined by column data type definition or database engines default. However undefined precision (0) can be returned from some database engines. 'Default Decimal Precision' is used when writing those undefined precision numbers.
- Display Name
- Default Decimal Precision
- Description
- When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type, a specific 'precision' denoting number of available digits is required. Generally, precision is defined by column data type definition or database engines default. However undefined precision (0) can be returned from some database engines. 'Default Decimal Precision' is used when writing those undefined precision numbers.
- API Name
- dbf-default-precision
- Default Value
- 10
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Join Strategy is set to any of [SQL]
-
Default Decimal Scale
When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type, a specific 'scale' denoting number of available decimal digits is required. Generally, scale is defined by column data type definition or database engines default. However when undefined precision (0) is returned, scale can also be uncertain with some database engines. 'Default Decimal Scale' is used when writing those undefined numbers. If a value has more decimals than specified scale, then the value will be rounded-up, e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.
- Display Name
- Default Decimal Scale
- Description
- When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type, a specific 'scale' denoting number of available decimal digits is required. Generally, scale is defined by column data type definition or database engines default. However when undefined precision (0) is returned, scale can also be uncertain with some database engines. 'Default Decimal Scale' is used when writing those undefined numbers. If a value has more decimals than specified scale, then the value will be rounded-up, e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.
- API Name
- dbf-default-scale
- Default Value
- 0
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Join Strategy is set to any of [SQL]
-
Enrichment Record Reader
The Record Reader for reading the 'enrichment' FlowFile
- Display Name
- Enrichment Record Reader
- Description
- The Record Reader for reading the 'enrichment' FlowFile
- API Name
- Enrichment Record Reader
- Service Interface
- org.apache.nifi.serialization.RecordReaderFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Insertion Record Path
Specifies where in the 'original' Record the 'enrichment' Record's fields should be inserted. Note that if the RecordPath does not point to any existing field in the original Record, the enrichment will not be inserted.
- Display Name
- Insertion Record Path
- Description
- Specifies where in the 'original' Record the 'enrichment' Record's fields should be inserted. Note that if the RecordPath does not point to any existing field in the original Record, the enrichment will not be inserted.
- API Name
- Insertion Record Path
- Default Value
- /
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- true
- Dependencies
-
- Join Strategy is set to any of [Insert Enrichment Fields]
-
Join Strategy
Specifies how to join the two FlowFiles into a single FlowFile
- Display Name
- Join Strategy
- Description
- Specifies how to join the two FlowFiles into a single FlowFile
- API Name
- Join Strategy
- Default Value
- Wrapper
- Allowable Values
-
- Wrapper
- SQL
- Insert Enrichment Fields
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Maximum number of Bins
Specifies the maximum number of bins that can be held in memory at any one time
- Display Name
- Maximum number of Bins
- Description
- Specifies the maximum number of bins that can be held in memory at any one time
- API Name
- Maximum number of Bins
- Default Value
- 10000
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Original Record Reader
The Record Reader for reading the 'original' FlowFile
- Display Name
- Original Record Reader
- Description
- The Record Reader for reading the 'original' FlowFile
- API Name
- Original Record Reader
- Service Interface
- org.apache.nifi.serialization.RecordReaderFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Record Writer
The Record Writer to use for writing the results. If the Record Writer is configured to inherit the schema from the Record, the schema that it will inherit will be the result of merging both the 'original' record schema and the 'enrichment' record schema.
- Display Name
- Record Writer
- Description
- The Record Writer to use for writing the results. If the Record Writer is configured to inherit the schema from the Record, the schema that it will inherit will be the result of merging both the 'original' record schema and the 'enrichment' record schema.
- API Name
- Record Writer
- Service Interface
- org.apache.nifi.serialization.RecordSetWriterFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
SQL
The SQL SELECT statement to evaluate. Expression Language may be provided, but doing so may result in poorer performance. Because this Processor is dealing with two FlowFiles at a time, it's also important to understand how attributes will be referenced. If both FlowFiles have an attribute with the same name but different values, the Expression Language will resolve to the value provided by the 'enrichment' FlowFile.
- Display Name
- SQL
- Description
- The SQL SELECT statement to evaluate. Expression Language may be provided, but doing so may result in poorer performance. Because this Processor is dealing with two FlowFiles at a time, it's also important to understand how attributes will be referenced. If both FlowFiles have an attribute with the same name but different values, the Expression Language will resolve to the value provided by the 'enrichment' FlowFile.
- API Name
- SQL
- Default Value
- SELECT original.*, enrichment.* FROM original LEFT OUTER JOIN enrichment ON original.id = enrichment.id
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- true
- Dependencies
-
- Join Strategy is set to any of [SQL]
-
Timeout
Specifies the maximum amount of time to wait for the second FlowFile once the first arrives at the processor, after which point the first FlowFile will be routed to the 'timeout' relationship.
- Display Name
- Timeout
- Description
- Specifies the maximum amount of time to wait for the second FlowFile once the first arrives at the processor, after which point the first FlowFile will be routed to the 'timeout' relationship.
- API Name
- Timeout
- Default Value
- 10 min
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
System Resource Considerations
Resource | Description |
---|---|
MEMORY | This Processor will load into heap all FlowFiles that are on its incoming queues. While it loads the FlowFiles themselves, and not their content, the FlowFile attributes can be very memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL engine may require buffering the entire contents of the enrichment FlowFile for each concurrent task. See Processor's Additional Details for more details and for steps on how to mitigate these concerns. |
Relationships
Name | Description |
---|---|
original | Both of the incoming FlowFiles ('original' and 'enrichment') will be routed to this Relationship. I.e., this is the 'original' version of both of these FlowFiles. |
timeout | If one of the incoming FlowFiles (i.e., the 'original' FlowFile or the 'enrichment' FlowFile) arrives to this Processor but the other does not arrive within the configured Timeout period, the FlowFile that did arrive is routed to this relationship. |
failure | If both the 'original' and 'enrichment' FlowFiles arrive at the processor but there was a failure in joining the records, both of those FlowFiles will be routed to this relationship. |
joined | The resultant FlowFile with Records joined together from both the original and enrichment FlowFiles will be routed to this relationship |
Writes Attributes
Name | Description |
---|---|
mime.type | Sets the mime.type attribute to the MIME Type specified by the Record Writer |
record.count | The number of records in the FlowFile |
See Also