Introduction
This guide is intended to provide an introduction and some guidance to developing extensions for Apache NiFi using Python. This guide is not intended to be an alternative to the NiFi Developers Guide document but rather a supplement to it. The normal Developer Guide is far more in depth and discusses more topics. However, that guide is targeted toward Java developers. The philosophies and guidance offered in that guide, generally, still hold for Python extensions, though.
Java/Python Communication
While NiFi is a Java based application, we do allow for native Python based processors. In order for this to work, it is essential that both the Java and Python processes be able to communicate with one another. To facilitate this, when a Python process is launched, a server is started on both the Java and Python sides. This server is started in such a way that it listens only on local network interfaces. That is, it is not possible to connect to either the Java or Python server from another machine. Connections must be made from localhost. This provides an important security layer.
There are objects on the Java side that must be made available to the Python side. Likewise, the Python side must return information to the Java side. For example, the Java application is responsible for storing the flow definition, such as the fact that some Processor exists, the configuration of that Processor, etc. It’s also responsible for maintaining the FlowFiles and their data. This information must be conveyed over the socket from the Java side to the Python side. Once a Python Processor performs its task and wants to route a given FlowFile to some relationship, this information must also be conveyed back to the Java side.
Fortunately, the NiFi API handles all of this and makes this seamless. This is handled by means of object proxies.
Object Proxies
Any time that a Java object must be made available to the Python API, it is made available via a proxy object. This means that in order to access a Java object from Python, we need to simply call the appropriate method on the Python proxy. When this method is called, a message is generated by the Python object and sent over the socket. That message is essentially an encoding of "Invoke method ABC on object XYZ, using arguments W, X, and Y."
For example, if we have an InputFlowFile
object named flowFile
and we want the filename
attribute, we can do so by calling:
filename = flowFile.getAttribute('filename')
From the Python API perspective, this is all that is necessary. Behind the scenes, a message is written to the local socket that is an encoding of the message "Invoke the getAttribute method on the object with ID 679212, with String argument 'filename'". The Java process then receives this command, invokes the specified method on the object with the given identifier, and writes back to the socket the result of that method invocation. As a result, the Python side receives the value of the "filename" attribute.
This is important to understand, because it means that any method invocation that occurs on a Java object must be serialized, written over the socket, deserialized, and then the method can be invoked. The result must then be serialized, written over the socket, and the result deserialized on the Python side. While this is a fairly efficient process, it is not nearly as efficient as simply invoking a method natively. As a result, it is important to consider the overhead of method invocations on Java objects.
Object age-off
It is also important to understand that any time that an object is provided as an argument to a Python Processor, that object can only be accessed on the Python side as long as the object is made available on the Java side. Because the java side cannot store all objects indefinitely, some cleanup must happen. This cleanup happens immediately after the method is invoked.
That means that if the transform
of a FlowFileTransform
is called with a ProcessContext
object, that object is available for use ONLY during the
method invocation. As soon as the method returns (successfully or not), the object will no longer be available for use. As a result, objects provided to method
invocations should not be stored for later use, such as assigning a value to self.processContext
.
Referencing an object that is no longer accessible will result in an error similar to:
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-2.0.0-SNAPSHOT-bin/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line 2466, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-2.0.0-SNAPSHOT-bin/nifi-2.0.0-SNAPSHOT/./python/extensions/SetRecordField.py", line 22, in transform <Your Line of Python Code> File "/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-2.0.0-SNAPSHOT-bin/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line 1460, in __str__ return self.toString() File "/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-2.0.0-SNAPSHOT-bin/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( File "/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-2.0.0-SNAPSHOT-bin/nifi-2.0.0-SNAPSHOT/python/framework/py4j/protocol.py", line 330, in get_return_value raise Py4JError( py4j.protocol.Py4JError: An error occurred while calling o15380.toString. Trace: py4j.Py4JException: Target Object ID does not exist for this gateway :o15380 at py4j.Gateway.invoke(Gateway.java:279) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at org.apache.nifi.py4j.server.NiFiGatewayConnection.run(NiFiGatewayConnection.java:91) at java.lang.Thread.run(Thread.java:750)
This indication "Target Object ID does not exist for this gateway…" indicates that the Python code is attempting to access a Java object that is no longer accessible.
Processor API
In the initial release of the feature that makes Python a first class citizen for extensions, we will focus purely on Processors. Initially, there will be no ability to develop Controller Services in Python, though Python based Processors may make use of existing Controller Services.
The Processor API that exists for Java is more general and less prescriptive than the Python counterpart. The Java API allows for a very wide array of possibilities in terms of the types of components that may be built. The Python API, on the other hand, is more narrowly scoped and prescriptive. There are many reasons for this:
-
It is easier to encourage best practices for components with a more narrowly focused API.
-
Most of the use cases in which we see a need for Python-based extension points (based on previous use of scripting Processors such as ExecuteScript) tend to be around data manipulation and/or complex evaluation of data.
-
More narrowly focused APIs result in code that requires less boilerplate.
-
Calls from Python to Java (and vice versa) are far more expensive than native method calls. Having APIs that are more tailored toward specific use cases allows for fewer interactions between the two processes, which greatly improves performance.
As a result, the Python API consists of three different Processor classes that can be implemented: FlowFileTransform
, RecordTransform
and FlowFileSource
.
Others may emerge in the future.
FlowFileTransform
The FlowFileTransform
API provides a mechanism for routing and transforming a FlowFile based on its attributes as well as its
textual or binary contents. Contrast this with the RecordTransform
API, which provides a mechanism for routing and transforming
individual Records (such as JSON, Avro or CSV Records, for example).
In order to implement the FlowFileTransform
API, a Python class must extend from the nifiapi.FlowFileTransform
class
and implement the transform(ProcessContext, InputFlowFile)
method, which returns a FlowFileTransformResult
.
Additionally, the Processor class must provide two pieces of information as subclasses: the Java interface that it implements
(which will always be org.apache.nifi.python.processor.FlowFileTransform
) and any details about the Processor, such as the
version, a description, keywords/tags that might be associated with the Processor, etc.
These will be discussed in more details below, in the ProcessorDetails and Java inner classes section.
As such, a simple implementation may look like this:
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult class WriteHelloWorld(FlowFileTransform): class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] class ProcessorDetails: version = '0.0.1-SNAPSHOT' def __init__(self, **kwargs): pass def transform(self, context, flowfile): return FlowFileTransformResult(relationship = "success", contents = "Hello World", attributes = {"greeting": "hello"})
The transform
method is expected to take two arguments: the context (of type nifiapi.properties.ProcessContext
) and
the flowfile (of type InputFlowFile
).
The return type is a FlowFileTransformResult
that indicates which Relationship the FlowFile should be transferred to,
the updated contents of the FlowFile, and any attributes that should be added to the FlowFile (or overwritten). The
relationship
is a required argument. The contents
is optional. If the contents of the FlowFile are not to be updated,
the contents
should be unspecified or should be specified as None
. The original FlowFile contents should not be returned,
as it will have the same effect as passing None
but will be more expensive, as the contents will be written out to the FlowFile.
Likewise, it is more efficient to omit the attributes
unless there is any attribute to add.
context
The context
parameter is of type nifiapi.properties.ProcessContext
. This class can be used to determine configuration, such as the
Processor’s name (via context.getName()
) and property values (via the context.getProperties()
and context.getProperty(String propertyName)
)
methods.
Note that the getProperty(String)
method does not return a String representation of the configured value but rather a PythonPropertyValue
object.
This allows for the property’s value to be interpreted in different ways. For example, PythonPropertyValue.getValue()
returns the String representation
of the value. PythonPropertyValue.asInteger()
returns None
or an integer representation of the value.
PythonPropertyValue.getTimePeriod( nifiapi.properties.TimeUnit )
can be used to retrieve the configured value as some time period.
For example, if a property named "timeout" is set to a value of "30 sec" we could use
context.getProperty("timeout").asTimePeriod(TimeUnit.MILLISECONDS)
and this would return to us a value of 30000
. This allows for a better
user experience than requiring properties to follow a certain convention such as seconds or milliseconds while still allowing you, as a Processor
developer, to easily obtain the value in whatever units make the most sense for the use case.
The PythonPropertyValue.asControllerService()
method can be used in order to obtain a Controller Service that can be used by the Processor.
The PythonPropertyValue
object also provides the ability to call the evaluateAttributeExpressions(attributeMap=None)
method.
This can be used to evaluate the configured Expression Language. For example, if a value of ${filename}
is used for a property value,
we can use context.getProperty("my property").evaluateAttributeExpressions(flowFile).getValue()
in order to evaluate the Expression Language
expression and then get the String representation of the value.
flowfile
The FlowFile is a proxy to the Java InputFlowFile
object. This exposes the following methods:
getContentsAsBytes
: returns the contents of the FlowFile as a byte array. This method should be used conservatively, as it it loads entire contents
of the FlowFile into a byte array on the Java side, and then sends a copy to the Python side. As a result, the FlowFile’s contents are buffered into memory
twice, once on the Java heap and once in the Python process.
getContentsAsReader
: returns a Java BufferedReader
that can be used to read the contents of the FlowFile one line at a time. While this is only applicable
for textual content, it avoids loading the entire FlowFile’s contents into memory. However, each invocation to BufferedReader.readLine()
does require a call
to Java, so the performance may not compare to that of calling getContentsAsBytes
.
getSize
: returns the number of bytes in the FlowFile’s contents.
getAttribute(String name)
: returns the value of the FlowFile’s attribute with the given name, or None
if the FlowFile does not have
an attribute with that name.
getAttributes()
: returns a Python dictionary whose keys are FlowFile attributes' names and whose values are the associated attribute values.
FlowFileTransformResult
After the Processor has performed its task, the Processor must return an instance of nifiapi.flowfiletransform.FlowFileTransformResult
.
The constructor has a single required positional argument, the relationship
to route the FlowFile to. Additionally, if the contents of
the FlowFile are to be updated, the FlowFile’s new contents should be returned via the contents
argument. Any FlowFile attributes that
are to be added or modified may additionally provided using the attributes
argument.
RecordTransform
While the FlowFileTransform
API provides the ability to operate on a FlowFile at a time, the RecordTransform
API provides developers
with the opportunity to operate on a single Record at a time. For example, if a FlowFile is made up of many JSON Records, the RecordTransform
Processor can be used to operate on each individual record without worrying about whether the Records are colocated or not.
Implementations of this API must extend from the RecordTransform
base class and must also implement the following method:
def transform(self, context, record, schema, attributemap)
returning a RecordTransformResult
object.
The context
object is an implementation of the same ProcessContext
that is used in the FlowFileTransform
Processor
(see context). The record
is a Python dictionary that represents the record to operate on. Regardless of whether
the source of the record is JSON, CSV, Avro, or some other input format, this method is provided a Python dictionary. This makes
it far simpler to operate on the data within Python and means that the code is very portable, as it can operate on any format of
data.
The associated schema
object is an instance of a Java object, org.apache.nifi.record.serialization.RecordSchema
. This provides a
schema for the data. However, calls to the schema must be made over the socket to the Java side and, as such, are expensive.
Finally, the method signature provides an attributemap
. This attributemap
has two methods:
getAttribute(String name)
: returns the value of the FlowFile’s attribute with the given name, or None
if the FlowFile does not have
an attribute with that name.
getAttributes()
: returns a Python dictionary whose keys are FlowFile attributes' names and whose values are the associated attribute values.
Note that these two methods are identical to those in the InputFlowFile
class discussed above. This allows the attributemap
to be
provided to a PythonPropertyValue
in order to evaluate Expression Language. For example, we might determine the name of a record’s field to use
for some operation by calling:
field_name = context.getProperty("Field Name").evaluateAttributeExpressions(attributemap).getValue()
Finally, the method must return an instance of nifiapi.recordtransform.RecordTransformResult
.
The RecordTransformResult
constructor takes four optional named arguments:
record
: the transformed version of the Record. If the record is not supplied, or if None
is supplied, the input Record will be
dropped from the output.
schema
: the transformed schema. If this is not specified, the schema will be inferred. However, if the schema is specified, the schema
is binding, not the data. So, if a field is missing from the schema, for instance, it will be dropped from the data. And if the schema has a field
in it and there’s no corresponding value in the data, the field will be assumed to have a value of None
.
relationship
: the name of the Relationship to route the Record to. If not specified, the value will be routed to the "success" relationship.
However, the implementation may choose to expose relationships other than "success" and "failure" and route records accordingly. For example,
the implementation may want to record a Record to either "valid" or "invalid."
partition
: By default, all Records in a given incoming FlowFile will be written to a single output FlowFile (or, more accurately, the transformed version
of the Record will be, assuming that a value of None
is not returned for the result’s record
field). However, we may want to partition
the incoming data into separate output FlowFiles. For example, we could have incoming data that has a "country" field and want a separate output FlowFile
for each country. In this case, we would return a Python dictionary for the partition
argument that looks something like {'country': record['country']}
.
If the partition has more than one field in the dictionary, all fields in the dictionary must be the same value for two Records in order for
the Records to be written to the same output FlowFile.
FlowFileSource
The FlowFileSource
API provides a mechanism for creating a FlowFile and routing it based on its textual or binary contents.
In order to implement the FlowFileSource
API, a Python class must extend from the nifiapi.FlowFileSource
class
and implement the create(ProcessContext)
method, which returns a FlowFileSourceResult
. Notice, that the difference between
FlowFileSource’s create(ProcessContext)
and FlowFileTransform’s transform(ProcessContext, InputFlowFile)
methods is
that the former does not expect an InputFlowFile object. That is because processors based on the FlowFileSource
API
are "source" processors that do not accept incoming connections but are capable of creating FlowFiles themselves.
Implementing a Processor based on FlowFileSource
is very similar to implementing one based on FlowFileTransform
.
A simple implementation looks like this:
from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult class CreateFlowFile(FlowFileSource): class Java: implements = ['org.apache.nifi.python.processor.FlowFileSource'] class ProcessorDetails: version = '0.0.1-SNAPSHOT' description = '''A Python processor that creates FlowFiles.''' def __init__(self, **kwargs): pass def create(self, context): return FlowFileSourceResult(relationship = 'success', attributes = {'greeting': 'hello'}, contents = 'Hello World!')
As mentioned above, the create
method only takes one argument: the context (of type nifiapi.properties.ProcessContext
).
The return type is a FlowFileSourceResult
that indicates which Relationship the FlowFile should be transferred to,
any attributes that should be added to the FlowFile and the contents of the FlowFile. The relationship
is a required argument.
Each processor based on the FlowFileSource
API has a success
relationship and additional relationships can be
created in the Processor’s Python code. attributes
and contents
are both optional. If attributes
is not provided,
the FlowFile will still have the usual filename
, path
and uuid
attributes, but no additional ones.
If contents
is not provided, a FlowFile with no contents (only attributes) will be created.
In case there is no useful information to return from the create
method, return None
can be used instead of returning an
empty FlowFileSourceResult
. When create()
returns with None
, the processor does not produce any output.
When there is nothing to return, it might be useful to yield the processor’s resources and not schedule the processor
to run for the period of time defined by the processor’s Yield Duration. This can be achieved by calling
context.yield_resources()
from the processor’s create
method right before returning None
.
PropertyDescriptors
An important aspect of any software is the ability to configure it. With NiFi, Processors are configured by their properties.
In order to expose what properties are available, a Processor must expose a PropertyDescriptor
for the property. The PropertyDescriptor
contains all of the information necessary in order to convey how to configure the property.
A PropertyDescriptor
is created using the nifiapi.properties.PropertyDescriptor
class. The constructor takes two required positional
arguments: name
and description
. All other arguments are optional.
Typically, a Processor will have multiple Property Descriptors. These descriptors are then returned to the NiFi framework by implementing the following
method in the Processor (regardless of whether it is a FlowFileTransform
, a RecordTransform
or a FlowFileSource
):
def getPropertyDescriptors(self)
This method returns a list of PropertyDescriptors. The typical convention is to create the Property Descriptors in the Processor’s constructor and then return them in this method, such as:
from nifiapi.flowfiletransform import FlowFileTransform from nifiapi.properties import PropertyDescriptor, StandardValidators class PrettyPrintJson(FlowFileTransform): ... def __init__(self, **kwargs): super.__init(**kwargs) numspaces = PropertyDescriptor(name="Number of Spaces", description="Number of spaces to use for pretty-printing", validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR], default_value="4", required=True) self.descriptors = [numspaces] ... def getPropertyDescriptors(self): return self.descriptors
There are times, however, that Processor developer wants to allow users to specify their only properties. For example, we may allow users to enter multiple key/value pairs where the key is the name of a Record field to set and the value is the value to set it to. To accomplish this, we implement the following method:
def getDynamicPropertyDescriptor(self, propertyname):
Which returns a PropertyDescriptor. For example:
def getDynamicPropertyDescriptor(self, propertyname): return PropertyDescriptor(name=propertyname, description="A user-defined property", dynamic=True) # dynamic=True is optional and included here only for completeness' sake
If this method is not implemented and a user adds a property other than those that are explicitly supported, the Processor will become invalid. Of course, we might also specify explicit validators that can be used, etc.
Relationships
Each Processor in NiFi must route its outgoing data to some destination. In NiFi, those destinations are called "Relationships." Each Processor is responsible for declaring its Relationships.
Both the FlowFileTransform and RecordTransform Processors already have a Relationship named original
and one named failure.
The original
relationship should not be used by implementations. This is used only by the framework and allows the input FlowFile
to be passed on without modification. If the Processor cannot transform its input (because the data is not valid, for example),
the Processor may route the data to the failure
relationship.
By default, both implementations also have a success
relationship. However, Processors may override the Relationships that it
defines. It does this by implementing the following method:
def getRelationships(self)
This method returns a list or a set of nifiapi.relationship.Relationship
objects. If this method is implemented, the success
Relationship will not automatically be made available. It will need to be created and returned within this list, if it is to be used.
Regardless of which Relationships are exposed by the implementation, the failure
and original
will always be made available.
Unlike FlowFileTransform and RecordTransform Processors, FlowFileSource Processors only have a success
relationship by default.
Implementations can use this relationship to route the created FlowFiles. Additional relationships can be exposed by implementing
the getRelationships
method. In the case of FlowFileSource implementing getRelationships
does not remove the success
relationship.
Any relationship returned by getRelationships
appears besides the success
relationship.
ProcessorDetails and Java inner classes
As noted above, the ProcessorDetails
and Java
inner classes are important to Processors. The Java
inner class must be defined
on all Processors and must include a member named implements
that is a list of Java interfaces that the class implements. This is
important, as it allows the Py4J protocol to understand how to interact with this obect from the Java side.
The ProcessorDetails
class tells NiFi about the Processor so that it can allow configuration of the Processor seamlessly through the NiFi UI.
Additionally, it provides details about what is necessary in order to use the Processor.
The ProcessorDetails
class may have several different members:
version
: The implementation version of the Processor
description
: A description that can be presented in the UI to explain how the Processor is to be used. This may be more than
a single sentence but should be kept as a few sentences, or a short paragraph.
tags
: a list of Strings that indicates tags or keywords that are associated with the Processor. When a user adds a Processor to the
NiFi canvas via the UI, users may search for keywords in order to provide discoverability. For example, if a user were to search for
"CSV" any Processor whose name contains the letters "CSV" would should up. Additionally, any Processor that has a "CSV" tag would also show up.
dependencies
: A list of Strings that are PyPI dependencies that the Processor depends on. The format of these strings is the same
as would be provided to pip install
. See Adding Third-Party Dependencies for more information.
Logging in NiFi
NiFi logging works much the same way as in any other application, with one important difference. NiFi aims to make the
user interface intuitive and informative, and as part of that experience will surface log messages that are appropriate.
In order to accommodate this, Processors should not instantiate their own loggers. Instead, Processors should simply
make use of self.logger
. This will be injected into the Processor after the Processor has been created. Of course, it can’t
be made available before the Processor has been created, so it cannot be accessed from within the constructor. However, it can
be used anywhere else.
Lifecycle Methods
Often times, it is necessary to create expensive objects and reuse them instead of creating an object once, using it, and throwing it away.
In order to make this simpler to handle, NiFi provides a method named onScheduled
. This method is optionally implemented in the Processor.
If the method is implemented, it is defined as:
def onScheduled(self, context)
Where context
is a ProcessContext as described earlier. The method has no return value.
This method is invoked once whenever a Processor is scheduled to run (regardless of whether it’s being started due to user input, NiFi restart, etc.).
Similarly, it is often necessary to tear down resources when they are no longer necessary. This can be accomplished by implementing the following method:
def onStopped(self, context)
This method is called once whenever the Processor has been stopped and no longer has any active tasks. It is safe to assume
that there are no longer any invocations of the transform
method running when this method is called.
State Manager
The context
object that is the parameter of the transform()
, create()
, onScheduled()
and onStopped()
methods
provides access to State Manager through the getStateManager()
method. State Manager is responsible for providing
a simple API for storing and retrieving state. For information on State Manager, refer to the NiFi Developer’s Guide.
The Python StateManager object returned by context.getStateManager()
provides access to the underlying Java StateManager.
With the Python StateManager, the state is handled in the form of Python dictionaries (as opposed to Java Maps).
Just like in Java, StateManager can handle both LOCAL
and CLUSTER
state.
Should an error occur accessing the state using the StateManager’s methods, a StateException is thrown that can be caught and handled in the Python Processor’s code. This enables the Python developer to implement the Processor in such a way that when a state-related error occurs, the Processor can continue its operation without disruption. If a StateException is thrown by StateManager but not caught in the Processor’s code, the ProcessSession is rolled back and an error is logged.
Below is an example Processor that uses StateManager. This example assumes that the Processor is executed on primary node only, and only on one thread.
from nifiapi.componentstate import Scope, StateManager, StateException from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult class CreateFlowFile(FlowFileSource): class Java: implements = ['org.apache.nifi.python.processor.FlowFileSource'] class ProcessorDetails: version = '0.0.1-SNAPSHOT' description = '''A Python processor that creates FlowFiles and uses StateManager.''' tags = ['test', 'python', 'source', 'state'] def __init__(self, **kwargs): pass def onScheduled(self, context): try: self.state = context.getStateManager().getState(Scope.CLUSTER).toMap() except StateException as e: self.logger.warn('Failed to read processor state. ' + str(e)) self.state = dict() def create(self, context): old_value = int(self.state.get('FlowFileNumber', '0')) new_value = old_value + 1 new_state = {'FlowFileNumber': str(new_value)} try: context.getStateManager().setState(new_state, Scope.CLUSTER) self.state = new_state except StateException as e: self.logger.warn('Failed to save state. ' + str(e)) return FlowFileSourceResult(relationship='success', attributes=new_state, contents=None)
Documenting Use Cases
No matter how powerful a piece of software is, it has no value unless people are able to use it. To that end, documentation of Processors is
very important. While a description of the Processor should be provided in the ProcessorDetails
class and each PropertyDescriptor is expected to have a description,
it is usually helpful to also call out specific use cases that can be performed by the Processor. This is particularly important for Processors that perform
more generalized transformations on objects, where a single Processor may be capable of performing multiple tasks, based on its configuration.
The @use_case
Decorator
The @use_case
decorator, defined in the nifiapi.documentation
module can facilitate this. The decorator takes four arguments:
-
description
: A simple 1 (at most 2) sentence description of the use case. Generally, this should not include any extraneous details, such as caveats, etc. Those can be provided using thenotes
argument. The description is required. -
notes
: Most of the time, 1-2 sentences is sufficient to describe a use case. Those 1-2 sentence should then be returned by thedescription
. In the event that the description is not sufficient, details may be provided to further explain, by providing caveats, etc. This is optional. -
keywords
: An array of keywords that can be associated with the use case. This is optional. -
configuration
: A description of how to configure the Processor for this particular use case. This may include explicit values to set for some properties, and may include instructions on how to choose the appropriate value for other properties. The configuration is required.
A single Processor may have multiple @use_case
decorators.
The @multi_processor_use_case
Decorator
When designing and creating Processors, it is important to keep in mind the idea of loose coupling. One Processor should not be dependent on another Processor in order to perform its task. That being said, it is often advantageous to build Processors that are designed to work well together. For example, a Processor that is able to perform a listing of files in a directory can provide an important capability in and of itself. Similarly, a Processor that is able to ingest the contents of a specific file and make that file’s contents the contents of a FlowFile is also an important capability in and of itself. But far more powerful than either of these individual capabilities is the notion of being able to compose a flow that lists all files in a directory and then ingests each of those files as a FlowFile. This is done by using a combination of the two. As such, it is important that the two Processors be able to work together in such a way that the output of the first is easily understood as the input of the second.
In this case, it makes sense to document this composition of Processors as a use case so that users can understand how to compose such a pipeline. This is accomplished
by using the @multi_processor_use_case
decorator. This decorator is very similar to the The @use_case
Decorator but instead of a configuration
element, it has a
configurations
element, which is a list
of ProcessorConfiguration
objects, where each ProcessorConfiguration
object has both a processor_type
, which is the
name of the Processor, and a configuration
that explains how to configure that particular Processor. The configuration
element typically also explains how to connect
outbound Relationships.
For example, we might use these decorators as such:
@use_case(description="Retrieve the contents of a given file on disk and create a FlowFile from it without modifying the file", keywords=["file", "filesystem"], configuration=""" Set the 'Filename' property to the fully qualified path of the file to ingest Set the 'Completion Strategy' to 'None' """) @use_case(description="Retrieve the contents of a given file on disk and create a FlowFile from it, deleting the local file upon success", keywords=["file", "filesystem"], configuration=""" Set the 'Filename' property to the fully qualified path of the file to ingest Set the 'Completion Strategy' to 'Delete' """) @multi_processor_use_case( description="Ingest all files from a landing directory on the filesystem and delete them after ingesting them.", keywords=["file", "filesystem", "landing directory"], configurations=[ ProcessorConfiguration( processor_type="org.apache.nifi.processors.standard.ListFile", configuration=""" Set 'Input Directory' to the directory that files should be ingested from Set 'Input Directory Location' to 'Local' """ ), ProcessorConfiguration( processor_type="FetchFile", configuration=""" Set the 'Filename' property to `${absolute.path}/${filename}` Set the 'Completion Strategy' to 'Delete' """ ) ]) class FetchFile(FlowFileTransform):
Note that in this case, we are able to specifically tell the user that the Filename property of FetchFile should be set to the value ${absolute.path}/${filename}
because we know that the ListFile Processor will produce these attributes for us.
Requirements
The Python API requires that Python 3.9, 3.10, 3.11, or 3.12 is available on the machine hosting NiFi.
Each Processor may have its own list of requirements / dependencies. These are made available to the Processor by creating a separate environment for each Processor implementation (not for each instance of a Processor on the canvas). PyPI is then used to install these dependencies in that environment.
Processor Reloading
Often times, while developing a Processor, the easiest way to verify and modify its behavior is to make small tweaks and re-run the data. This is possible in NiFi without restarting. Once a Processor has been discovered and loaded, any changes to the Processor’s source code will take effect whenever the Processor is started again (or during certain other events, such as validation, while the Processor is stopped).
So we can easily update the source code for a Processor, start it, verify the results, stop the Processor, and update again as necessary. Or, more simply, click "Run Once" to verify the behavior; modify if necessary; and Run Once again. It is important to note, however, that if the Processor could not be successfully loaded the first time, NiFi may not monitor it for changes. Therefore, it’s important to ensure that the Processor is in a good working state before attempting to load it in NiFi. Otherwise, NiFi will need to be restarted in order to discover the Processor and load it again.
Because NiFi allows for multiple extension directories to be deployed, it might be helpful when developing a new extension to add the source directory where the extension is being developed as a NiFi extension source directory. This allows developers to develop processors using their IDE and allows NiFi to pickup any changes seamlessly as soon as the Processor is started.
Adding Third-Party Dependencies
Python based Processors can be a single module, or they can be bundled together as a Python package. How you specify third-party dependencies depends on how the Processor is packaged.
Package-level Dependencies
If one or more Processors are defined within a Python package, the package should define a requirements.txt
file that declares all third-party dependencies
that are necessary for any Processor in the package. The file structure will then typically look like this:
my-python-package/ │ ├── __init__.py │ ├── ProcessorA.py │ ├── ProcessorB.py │ └── requirements.txt
In this way, all of the requirements will be loaded from the requirements.txt
file once for the package. There will be no need to load the dependencies once for
ProcessorA and once for ProcessorB.
Processor-Level Dependencies
If your Processor is not a part of a Python package, its dependencies can be declared using the dependencies
member of the ProcessorDetails
inner class.
This is a list of Strings that indicate the PyPI modules that the Processor depends on. The format is the same format expected by PyPI.
This provides a convenience for declaring third-party dependencies without requiring that Processors be bundled into a package.
For example, to indicate that a Processor needs pandas
installed, the implementation might
look like this:
class PandasProcessor(FlowFileTransform): class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] class ProcessorDetails: version = '0.0.1-SNAPSHOT', dependencies = ['pandas']
However, it is often necessary to declare a specific version of a dependency. And it may also be necessary to define multiple dependencies. We can do that in this manner:
class PandasProcessor(FlowFileTransform): class Java: implements = ['org.apache.nifi.python.processor.FlowFileTransform'] class ProcessorDetails: version = '0.0.1-SNAPSHOT', dependencies = ['pandas', 'numpy==1.20.0']
Here, we accept any version of pandas
(though the latest is preferred), and we require version 1.20.0
of numpy
.
Dependency Isolation
The first time that a user creates a NiFi Processor of a given type, NiFi will create a separate Python env (venv) for the Processor.
It will use pip
to install the specified dependencies from PyPI only into the appropriate Python environment for that Processor.
Therefore, dependencies of one Processor are not made available to another Processor.
Beyond that, dependencies of one version of a Processor are not made available to other versions of the Processor. So, for example,
if we have two different versions of the same Processor made available, version 0.0.1
and version 0.0.2
, the dependencies that are
necessary for version 0.0.1
will not be made available to version 0.0.2
unless version 0.0.2
of the Processor also declares
those dependencies.
Some environments, however, cannot make use of pip
for package management. In an air-gapped environment, for example, or in
environments with strict security policies in place, pip
may not be available. In such a case, Python processors can be packaged
using the NiFi ARchive (NAR) format. This is a .zip file with the following specific layout, and uses a filename extension of .nar
:
my-nar-bundle.nar
+-- META-INF/
+-- MANIFEST.MF
+-- NAR-INF/
+-- bundled-dependencies/
+-- dependency1
+-- dependency2
+-- ...
+-- dependencyN
MyProcessor.py
Deploying a Developed Processor
Once a Processor has been developed, it can be made available in NiFi using one of two methods.
For Processors that have been packaged as a NAR file, the NAR file should be copied to NiFi’s lib/
directory or configured extensions directory.
For Processors that are not pre-packaged as a NAR, the Processor is deployed by copying the source of the Python extension to the $NIFI_HOME/python/extensions
directory by default.
The actual directory to look for extensions can be configured in nifi.properties
via properties that have the prefix nifi.python.extensions.source.directory.
.
For example, by default, nifi.python.extensions.source.directory.default
is set to ./python/extensions
. However, additional paths may be added by replacing default
in the property name with some other value.
Any .py
file found in the directory will be parsed and examined in order to determine whether or not it is a valid NiFi Processor.
In order to be found, the Processor must have a valid parent (FlowFileTransform
, RecordTransform
or FlowFileSource
) and must have an inner class named Java
with a implements = ['org.apache.nifi.python.processor.FlowFileTransform']
or implements = ['org.apache.nifi.python.processor.RecordFileTransform']
or implements = ['org.apache.nifi.python.processor.FlowFileSource']
.
This will allow NiFi to automatically discover the Processor.
Note, however, that if the Processor implementation is broken into multiple Python modules, those modules will not be made available by default. In order
to package a Processor along with its modules, the Processor and any related module must be added to a directory that is directly below the Extensions directory.
For example, if the WriteNumber.py
file contains a NiFi Processor and also depends on the ProcessorUtil.py
module, the directory structure would look like this:
NIFI_HOME/ - python/ - extensions/ ProcessorA.py ProcessorB.py write-number/ __init__.py ProcessorUtils.py WriteNumber.py
By packaging them together in a subdirectory, NiFi knows to expose the modules to one another. However, the ProcessorA module will have no access
to the ProcessorUtils
module. Only WriteNumber
will have access to it.
Troubleshooting
The Python environments that are created for Processors are located in NiFi’s working directory, under the directory structure
python/extensions/<module name>/<version>
. So, for example, if we have a Processor MyProcessor
that has version 0.0.1
,
its environment directory will be $NIFI_HOME/work/python/extensions/MyProcessor/0.0.1
. This directory may be modified by updating
the value of the nifi.python.working.directory
property in nifi.properties
.
For troubleshooting purposes, we may want to delete the environment directory. this can be done simply by stopping NiFi, deleting the appropriate environment directory, and restarting NiFi.
While you may delete the entire work
directory while NiFi is stopped, doing so may result in NiFi taking significantly longer to startup
the next time, as it must source all extensions' dependencies from PyPI, as well as expand all Java extensions' NAR files.
Debugging
It’s often helpful to attach a remote debugger to the Python process so that we can step through the code, examine variables, and execute code snippets, etc.
The method used will vary based on the IDE used. However, here we will look at how to enable remote debugging using VSCode. VSCode comes with a debugger named DebugPy.
It is important to note that every instance of a Processor and even each concurrent task in a Processor may end up using a separate Python process. Because of this, it is difficult to enable the Python process to listen for incoming connections because there may be many different processes. Instead, it is recommended to enable listening in VSCode and then allow the Processor itself to connect to the debugger.
To enable listening in VSCode, we must first create a launch.json
launch configuration. The launch.json
should have a
listen
section to tell it to listen on a particular port. Additionally, the pathMappings
must be setup to indicate the
local directory in which VSCode should find the Python code, and the remoteRoot
which is the directory in which NiFi should find
the Python code.
For example:
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Remote Attach",
"type": "python",
"request": "attach",
"listen": {
"host": "localhost",
"port": 5678
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}/nifi-python-test-extensions/src/main/resources/extensions",
"remoteRoot": "./python/extensions"
}
],
"justMyCode": true
}
]
}
We can then launch this using VSCode.
Next, we must tell the Processor to connect to the debugger. This requires updating the Processor’s code.
Firstly, the DebugPy module must be added as a dependency. Then the Processor needs to connect to the debugger.
To enable remote debugging on Processors themselves, it is necessary to update the Processor’s code to implement something akin to:
class MyProcessor(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
dependencies = ['debugpy']
def onScheduled(self, context):
try:
import debugpy
debugpy.connect(6688)
except e:
self.logger.error("Failed to connect to python debug listener")
It is important to note, however, that the code available to VSCode must exactly match the code that NiFi is using in order to ensure that breakpoints line up correctly. As a result, the code should be updated in VSCode and then copied into NiFi’s directory. At that point, NiFi does not require a restart, but the Processor must be stopped and started again.
Now, when the Processor is scheduled, it will connect to the VSCode debugger, and you can set breakpoints in the VSCode in order to debug the Processor.