Introduction

This guide is intended to provide an introduction and some guidance to developing extensions for Apache NiFi using Python. This guide is not intended to be an alternative to the NiFi Developers Guide document but rather a supplement to it. The normal Developer Guide is far more in depth and discusses more topics. However, that guide is targeted toward Java developers. The philosophies and guidance offered in that guide, generally, still hold for Python extensions, though.

Java/Python Communication

While NiFi is a Java based application, we do allow for native Python based processors. In order for this to work, it is essential that both the Java and Python processes be able to communicate with one another. To facilitate this, when a Python process is launched, a server is started on both the Java and Python sides. This server is started in such a way that it listens only on local network interfaces. That is, it is not possible to connect to either the Java or Python server from another machine. Connections must be made from localhost. This provides an important security layer.

There are objects on the Java side that must be made available to the Python side. Likewise, the Python side must return information to the Java side. For example, the Java application is responsible for storing the flow definition, such as the fact that some Processor exists, the configuration of that Processor, etc. It’s also responsible for maintaining the FlowFiles and their data. This information must be conveyed over the socket from the Java side to the Python side. Once a Python Processor performs its task and wants to route a given FlowFile to some relationship, this information must also be conveyed back to the Java side.

Fortunately, the NiFi API handles all of this and makes this seamless. This is handled by means of object proxies.

Object Proxies

Any time that a Java object must be made available to the Python API, it is made available via a proxy object. This means that in order to access a Java object from Python, we need to simply call the appropriate method on the Python proxy. When this method is called, a message is generated by the Python object and sent over the socket. That message is essentially an encoding of "Invoke method ABC on object XYZ, using arguments W, X, and Y."

For example, if we have an InputFlowFile object named flowFile and we want the filename attribute, we can do so by calling:

filename = flowFile.getAttribute('filename')

From the Python API perspective, this is all that is necessary. Behind the scenes, a message is written to the local socket that is an encoding of the message "Invoke the getAttribute method on the object with ID 679212, with String argument 'filename'". The Java process then receives this command, invokes the specified method on the object with the given identifier, and writes back to the socket the result of that method invocation. As a result, the Python side receives the value of the "filename" attribute.

This is important to understand, because it means that any method invocation that occurs on a Java object must be serialized, written over the socket, deserialized, and then the method can be invoked. The result must then be serialized, written over the socket, and the result deserialized on the Python side. While this is a fairly efficient process, it is not nearly as efficient as simply invoking a method natively. As a result, it is important to consider the overhead of method invocations on Java objects.

Object age-off

It is also important to understand that any time that an object is provided as an argument to a Python Processor, that object can only be accessed on the Python side as long as the object is made available on the Java side. Because the java side cannot store all objects indefinitely, some cleanup must happen. This cleanup happens immediately after the method is invoked.

That means that if the transform of a FlowFileTransform is called with a ProcessContext object, that object is available for use ONLY during the method invocation. As soon as the method returns (successfully or not), the object will no longer be available for use. As a result, objects provided to method invocations should not be stored for later use, such as assigning a value to self.processContext.

Referencing an object that is no longer accessible will result in an error similar to:

py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-2.0.0-SNAPSHOT-bin/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line 2466, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-2.0.0-SNAPSHOT-bin/nifi-2.0.0-SNAPSHOT/./python/extensions/SetRecordField.py", line 22, in transform
    <Your Line of Python Code>
  File "/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-2.0.0-SNAPSHOT-bin/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line 1460, in __str__
    return self.toString()
  File "/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-2.0.0-SNAPSHOT-bin/nifi-2.0.0-SNAPSHOT/python/framework/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-2.0.0-SNAPSHOT-bin/nifi-2.0.0-SNAPSHOT/python/framework/py4j/protocol.py", line 330, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o15380.toString. Trace:
py4j.Py4JException: Target Object ID does not exist for this gateway :o15380
	at py4j.Gateway.invoke(Gateway.java:279)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at org.apache.nifi.py4j.server.NiFiGatewayConnection.run(NiFiGatewayConnection.java:91)
	at java.lang.Thread.run(Thread.java:750)

This indication "Target Object ID does not exist for this gateway…​" indicates that the Python code is attempting to access a Java object that is no longer accessible.

Processor API

In the initial release of the feature that makes Python a first class citizen for extensions, we will focus purely on Processors. Initially, there will be no ability to develop Controller Services in Python, though Python based Processors may make use of existing Controller Services.

The Processor API that exists for Java is more general and less prescriptive than the Python counterpart. The Java API allows for a very wide array of possibilities in terms of the types of components that may be built. The Python API, on the other hand, is more narrowly scoped and prescriptive. There are many reasons for this:

  • It is easier to encourage best practices for components with a more narrowly focused API.

  • Most of the use cases in which we see a need for Python-based extension points (based on previous use of scripting Processors such as ExecuteScript) tend to be around data manipulation and/or complex evaluation of data.

  • More narrowly focused APIs result in code that requires less boilerplate.

  • Calls from Python to Java (and vice versa) are far more expensive than native method calls. Having APIs that are more tailored toward specific use cases allows for fewer interactions between the two processes, which greatly improves performance.

As a result, the Python API consists of three different Processor classes that can be implemented: FlowFileTransform, RecordTransform and FlowFileSource. Others may emerge in the future.

FlowFileTransform

The FlowFileTransform API provides a mechanism for routing and transforming a FlowFile based on its attributes as well as its textual or binary contents. Contrast this with the RecordTransform API, which provides a mechanism for routing and transforming individual Records (such as JSON, Avro or CSV Records, for example).

In order to implement the FlowFileTransform API, a Python class must extend from the nifiapi.FlowFileTransform class and implement the transform(ProcessContext, InputFlowFile) method, which returns a FlowFileTransformResult.

Additionally, the Processor class must provide two pieces of information as subclasses: the Java interface that it implements (which will always be org.apache.nifi.python.processor.FlowFileTransform) and any details about the Processor, such as the version, a description, keywords/tags that might be associated with the Processor, etc. These will be discussed in more details below, in the ProcessorDetails and Java inner classes section.

As such, a simple implementation may look like this:

from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult

class WriteHelloWorld(FlowFileTransform):
    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
    class ProcessorDetails:
        version = '0.0.1-SNAPSHOT'

    def __init__(self, **kwargs):
        pass

    def transform(self, context, flowfile):
        return FlowFileTransformResult(relationship = "success", contents = "Hello World", attributes = {"greeting": "hello"})

The transform method is expected to take two arguments: the context (of type nifiapi.properties.ProcessContext) and the flowfile (of type InputFlowFile).

The return type is a FlowFileTransformResult that indicates which Relationship the FlowFile should be transferred to, the updated contents of the FlowFile, and any attributes that should be added to the FlowFile (or overwritten). The relationship is a required argument. The contents is optional. If the contents of the FlowFile are not to be updated, the contents should be unspecified or should be specified as None. The original FlowFile contents should not be returned, as it will have the same effect as passing None but will be more expensive, as the contents will be written out to the FlowFile. Likewise, it is more efficient to omit the attributes unless there is any attribute to add.

context

The context parameter is of type nifiapi.properties.ProcessContext. This class can be used to determine configuration, such as the Processor’s name (via context.getName()) and property values (via the context.getProperties() and context.getProperty(String propertyName)) methods.

Note that the getProperty(String) method does not return a String representation of the configured value but rather a PythonPropertyValue object. This allows for the property’s value to be interpreted in different ways. For example, PythonPropertyValue.getValue() returns the String representation of the value. PythonPropertyValue.asInteger() returns None or an integer representation of the value.

PythonPropertyValue.getTimePeriod( nifiapi.properties.TimeUnit ) can be used to retrieve the configured value as some time period. For example, if a property named "timeout" is set to a value of "30 sec" we could use context.getProperty("timeout").asTimePeriod(TimeUnit.MILLISECONDS) and this would return to us a value of 30000. This allows for a better user experience than requiring properties to follow a certain convention such as seconds or milliseconds while still allowing you, as a Processor developer, to easily obtain the value in whatever units make the most sense for the use case.

The PythonPropertyValue.asControllerService() method can be used in order to obtain a Controller Service that can be used by the Processor.

The PythonPropertyValue object also provides the ability to call the evaluateAttributeExpressions(attributeMap=None) method. This can be used to evaluate the configured Expression Language. For example, if a value of ${filename} is used for a property value, we can use context.getProperty("my property").evaluateAttributeExpressions(flowFile).getValue() in order to evaluate the Expression Language expression and then get the String representation of the value.

flowfile

The FlowFile is a proxy to the Java InputFlowFile object. This exposes the following methods:

getContentsAsBytes : returns the contents of the FlowFile as a byte array. This method should be used conservatively, as it it loads entire contents of the FlowFile into a byte array on the Java side, and then sends a copy to the Python side. As a result, the FlowFile’s contents are buffered into memory twice, once on the Java heap and once in the Python process.

getContentsAsReader : returns a Java BufferedReader that can be used to read the contents of the FlowFile one line at a time. While this is only applicable for textual content, it avoids loading the entire FlowFile’s contents into memory. However, each invocation to BufferedReader.readLine() does require a call to Java, so the performance may not compare to that of calling getContentsAsBytes.

getSize : returns the number of bytes in the FlowFile’s contents.

getAttribute(String name) : returns the value of the FlowFile’s attribute with the given name, or None if the FlowFile does not have an attribute with that name.

getAttributes() : returns a Python dictionary whose keys are FlowFile attributes' names and whose values are the associated attribute values.

FlowFileTransformResult

After the Processor has performed its task, the Processor must return an instance of nifiapi.flowfiletransform.FlowFileTransformResult. The constructor has a single required positional argument, the relationship to route the FlowFile to. Additionally, if the contents of the FlowFile are to be updated, the FlowFile’s new contents should be returned via the contents argument. Any FlowFile attributes that are to be added or modified may additionally provided using the attributes argument.

RecordTransform

While the FlowFileTransform API provides the ability to operate on a FlowFile at a time, the RecordTransform API provides developers with the opportunity to operate on a single Record at a time. For example, if a FlowFile is made up of many JSON Records, the RecordTransform Processor can be used to operate on each individual record without worrying about whether the Records are colocated or not. Implementations of this API must extend from the RecordTransform base class and must also implement the following method:

def transform(self, context, record, schema, attributemap)

returning a RecordTransformResult object.

The context object is an implementation of the same ProcessContext that is used in the FlowFileTransform Processor (see context). The record is a Python dictionary that represents the record to operate on. Regardless of whether the source of the record is JSON, CSV, Avro, or some other input format, this method is provided a Python dictionary. This makes it far simpler to operate on the data within Python and means that the code is very portable, as it can operate on any format of data.

The associated schema object is an instance of a Java object, org.apache.nifi.record.serialization.RecordSchema. This provides a schema for the data. However, calls to the schema must be made over the socket to the Java side and, as such, are expensive.

Finally, the method signature provides an attributemap. This attributemap has two methods:

getAttribute(String name) : returns the value of the FlowFile’s attribute with the given name, or None if the FlowFile does not have an attribute with that name.

getAttributes() : returns a Python dictionary whose keys are FlowFile attributes' names and whose values are the associated attribute values.

Note that these two methods are identical to those in the InputFlowFile class discussed above. This allows the attributemap to be provided to a PythonPropertyValue in order to evaluate Expression Language. For example, we might determine the name of a record’s field to use for some operation by calling:

field_name = context.getProperty("Field Name").evaluateAttributeExpressions(attributemap).getValue()

Finally, the method must return an instance of nifiapi.recordtransform.RecordTransformResult.

The RecordTransformResult constructor takes four optional named arguments:

record : the transformed version of the Record. If the record is not supplied, or if None is supplied, the input Record will be dropped from the output.

schema : the transformed schema. If this is not specified, the schema will be inferred. However, if the schema is specified, the schema is binding, not the data. So, if a field is missing from the schema, for instance, it will be dropped from the data. And if the schema has a field in it and there’s no corresponding value in the data, the field will be assumed to have a value of None.

relationship : the name of the Relationship to route the Record to. If not specified, the value will be routed to the "success" relationship. However, the implementation may choose to expose relationships other than "success" and "failure" and route records accordingly. For example, the implementation may want to record a Record to either "valid" or "invalid."

partition : By default, all Records in a given incoming FlowFile will be written to a single output FlowFile (or, more accurately, the transformed version of the Record will be, assuming that a value of None is not returned for the result’s record field). However, we may want to partition the incoming data into separate output FlowFiles. For example, we could have incoming data that has a "country" field and want a separate output FlowFile for each country. In this case, we would return a Python dictionary for the partition argument that looks something like {'country': record['country']}. If the partition has more than one field in the dictionary, all fields in the dictionary must be the same value for two Records in order for the Records to be written to the same output FlowFile.

FlowFileSource

The FlowFileSource API provides a mechanism for creating a FlowFile and routing it based on its textual or binary contents.

In order to implement the FlowFileSource API, a Python class must extend from the nifiapi.FlowFileSource class and implement the create(ProcessContext) method, which returns a FlowFileSourceResult. Notice, that the difference between FlowFileSource’s create(ProcessContext) and FlowFileTransform’s transform(ProcessContext, InputFlowFile) methods is that the former does not expect an InputFlowFile object. That is because processors based on the FlowFileSource API are "source" processors that do not accept incoming connections but are capable of creating FlowFiles themselves.

Implementing a Processor based on FlowFileSource is very similar to implementing one based on FlowFileTransform. A simple implementation looks like this:

from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult

class CreateFlowFile(FlowFileSource):
    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileSource']

    class ProcessorDetails:
        version = '0.0.1-SNAPSHOT'
        description = '''A Python processor that creates FlowFiles.'''

    def __init__(self, **kwargs):
        pass

    def create(self, context):
        return FlowFileSourceResult(relationship = 'success', attributes = {'greeting': 'hello'}, contents = 'Hello World!')

As mentioned above, the create method only takes one argument: the context (of type nifiapi.properties.ProcessContext).

The return type is a FlowFileSourceResult that indicates which Relationship the FlowFile should be transferred to, any attributes that should be added to the FlowFile and the contents of the FlowFile. The relationship is a required argument. Each processor based on the FlowFileSource API has a success relationship and additional relationships can be created in the Processor’s Python code. attributes and contents are both optional. If attributes is not provided, the FlowFile will still have the usual filename, path and uuid attributes, but no additional ones. If contents is not provided, a FlowFile with no contents (only attributes) will be created. In case there is no useful information to return from the create method, return None can be used instead of returning an empty FlowFileSourceResult. When create() returns with None, the processor does not produce any output. When there is nothing to return, it might be useful to yield the processor’s resources and not schedule the processor to run for the period of time defined by the processor’s Yield Duration. This can be achieved by calling context.yield_resources() from the processor’s create method right before returning None.

PropertyDescriptors

An important aspect of any software is the ability to configure it. With NiFi, Processors are configured by their properties. In order to expose what properties are available, a Processor must expose a PropertyDescriptor for the property. The PropertyDescriptor contains all of the information necessary in order to convey how to configure the property.

A PropertyDescriptor is created using the nifiapi.properties.PropertyDescriptor class. The constructor takes two required positional arguments: name and description. All other arguments are optional.

Typically, a Processor will have multiple Property Descriptors. These descriptors are then returned to the NiFi framework by implementing the following method in the Processor (regardless of whether it is a FlowFileTransform, a RecordTransform or a FlowFileSource):

def getPropertyDescriptors(self)

This method returns a list of PropertyDescriptors. The typical convention is to create the Property Descriptors in the Processor’s constructor and then return them in this method, such as:

from nifiapi.flowfiletransform import FlowFileTransform
from nifiapi.properties import PropertyDescriptor, StandardValidators

class PrettyPrintJson(FlowFileTransform):
...
    def __init__(self, **kwargs):
        super.__init(**kwargs)

        numspaces = PropertyDescriptor(name="Number of Spaces",
            description="Number of spaces to use for pretty-printing",
            validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
            default_value="4",
            required=True)
        self.descriptors = [numspaces]

...

    def getPropertyDescriptors(self):
        return self.descriptors

There are times, however, that Processor developer wants to allow users to specify their only properties. For example, we may allow users to enter multiple key/value pairs where the key is the name of a Record field to set and the value is the value to set it to. To accomplish this, we implement the following method:

def getDynamicPropertyDescriptor(self, propertyname):

Which returns a PropertyDescriptor. For example:

def getDynamicPropertyDescriptor(self, propertyname):
    return PropertyDescriptor(name=propertyname,
        description="A user-defined property",
        dynamic=True)   # dynamic=True is optional and included here only for completeness' sake

If this method is not implemented and a user adds a property other than those that are explicitly supported, the Processor will become invalid. Of course, we might also specify explicit validators that can be used, etc.

Relationships

Each Processor in NiFi must route its outgoing data to some destination. In NiFi, those destinations are called "Relationships." Each Processor is responsible for declaring its Relationships.

Both the FlowFileTransform and RecordTransform Processors already have a Relationship named original and one named failure. The original relationship should not be used by implementations. This is used only by the framework and allows the input FlowFile to be passed on without modification. If the Processor cannot transform its input (because the data is not valid, for example), the Processor may route the data to the failure relationship.

By default, both implementations also have a success relationship. However, Processors may override the Relationships that it defines. It does this by implementing the following method:

def getRelationships(self)

This method returns a list or a set of nifiapi.relationship.Relationship objects. If this method is implemented, the success Relationship will not automatically be made available. It will need to be created and returned within this list, if it is to be used. Regardless of which Relationships are exposed by the implementation, the failure and original will always be made available.

Unlike FlowFileTransform and RecordTransform Processors, FlowFileSource Processors only have a success relationship by default. Implementations can use this relationship to route the created FlowFiles. Additional relationships can be exposed by implementing the getRelationships method. In the case of FlowFileSource implementing getRelationships does not remove the success relationship. Any relationship returned by getRelationships appears besides the success relationship.

ProcessorDetails and Java inner classes

As noted above, the ProcessorDetails and Java inner classes are important to Processors. The Java inner class must be defined on all Processors and must include a member named implements that is a list of Java interfaces that the class implements. This is important, as it allows the Py4J protocol to understand how to interact with this obect from the Java side.

The ProcessorDetails class tells NiFi about the Processor so that it can allow configuration of the Processor seamlessly through the NiFi UI. Additionally, it provides details about what is necessary in order to use the Processor. The ProcessorDetails class may have several different members:

version : The implementation version of the Processor

description : A description that can be presented in the UI to explain how the Processor is to be used. This may be more than a single sentence but should be kept as a few sentences, or a short paragraph.

tags : a list of Strings that indicates tags or keywords that are associated with the Processor. When a user adds a Processor to the NiFi canvas via the UI, users may search for keywords in order to provide discoverability. For example, if a user were to search for "CSV" any Processor whose name contains the letters "CSV" would should up. Additionally, any Processor that has a "CSV" tag would also show up.

dependencies : A list of Strings that are PyPI dependencies that the Processor depends on. The format of these strings is the same as would be provided to pip install. See Adding Third-Party Dependencies for more information.

Logging in NiFi

NiFi logging works much the same way as in any other application, with one important difference. NiFi aims to make the user interface intuitive and informative, and as part of that experience will surface log messages that are appropriate. In order to accommodate this, Processors should not instantiate their own loggers. Instead, Processors should simply make use of self.logger. This will be injected into the Processor after the Processor has been created. Of course, it can’t be made available before the Processor has been created, so it cannot be accessed from within the constructor. However, it can be used anywhere else.

Lifecycle Methods

Often times, it is necessary to create expensive objects and reuse them instead of creating an object once, using it, and throwing it away. In order to make this simpler to handle, NiFi provides a method named onScheduled. This method is optionally implemented in the Processor. If the method is implemented, it is defined as:

def onScheduled(self, context)

Where context is a ProcessContext as described earlier. The method has no return value. This method is invoked once whenever a Processor is scheduled to run (regardless of whether it’s being started due to user input, NiFi restart, etc.).

Similarly, it is often necessary to tear down resources when they are no longer necessary. This can be accomplished by implementing the following method:

def onStopped(self, context)

This method is called once whenever the Processor has been stopped and no longer has any active tasks. It is safe to assume that there are no longer any invocations of the transform method running when this method is called.

Documenting Use Cases

No matter how powerful a piece of software is, it has no value unless people are able to use it. To that end, documentation of Processors is very important. While a description of the Processor should be provided in the ProcessorDetails class and each PropertyDescriptor is expected to have a description, it is usually helpful to also call out specific use cases that can be performed by the Processor. This is particularly important for Processors that perform more generalized transformations on objects, where a single Processor may be capable of performing multiple tasks, based on its configuration.

The @use_case Decorator

The @use_case decorator, defined in the nifiapi.documentation module can facilitate this. The decorator takes four arguments:

  • description: A simple 1 (at most 2) sentence description of the use case. Generally, this should not include any extraneous details, such as caveats, etc. Those can be provided using the notes argument. The description is required.

  • notes: Most of the time, 1-2 sentences is sufficient to describe a use case. Those 1-2 sentence should then be returned by the description. In the event that the description is not sufficient, details may be provided to further explain, by providing caveats, etc. This is optional.

  • keywords: An array of keywords that can be associated with the use case. This is optional.

  • configuration: A description of how to configure the Processor for this particular use case. This may include explicit values to set for some properties, and may include instructions on how to choose the appropriate value for other properties. The configuration is required.

A single Processor may have multiple @use_case decorators.

The @multi_processor_use_case Decorator

When designing and creating Processors, it is important to keep in mind the idea of loose coupling. One Processor should not be dependent on another Processor in order to perform its task. That being said, it is often advantageous to build Processors that are designed to work well together. For example, a Processor that is able to perform a listing of files in a directory can provide an important capability in and of itself. Similarly, a Processor that is able to ingest the contents of a specific file and make that file’s contents the contents of a FlowFile is also an important capability in and of itself. But far more powerful than either of these individual capabilities is the notion of being able to compose a flow that lists all files in a directory and then ingests each of those files as a FlowFile. This is done by using a combination of the two. As such, it is important that the two Processors be able to work together in such a way that the output of the first is easily understood as the input of the second.

In this case, it makes sense to document this composition of Processors as a use case so that users can understand how to compose such a pipeline. This is accomplished by using the @multi_processor_use_case decorator. This decorator is very similar to the The @use_case Decorator but instead of a configuration element, it has a configurations element, which is a list of ProcessorConfiguration objects, where each ProcessorConfiguration object has both a processor_type, which is the name of the Processor, and a configuration that explains how to configure that particular Processor. The configuration element typically also explains how to connect outbound Relationships.

For example, we might use these decorators as such:

@use_case(description="Retrieve the contents of a given file on disk and create a FlowFile from it without modifying the file",
          keywords=["file", "filesystem"],
          configuration="""
                Set the 'Filename' property to the fully qualified path of the file to ingest
                Set the 'Completion Strategy' to 'None'
          """)
@use_case(description="Retrieve the contents of a given file on disk and create a FlowFile from it, deleting the local file upon success",
          keywords=["file", "filesystem"],
          configuration="""
                Set the 'Filename' property to the fully qualified path of the file to ingest
                Set the 'Completion Strategy' to 'Delete'
          """)
@multi_processor_use_case(
      description="Ingest all files from a landing directory on the filesystem and delete them after ingesting them.",
      keywords=["file", "filesystem", "landing directory"],
      configurations=[
          ProcessorConfiguration(
              processor_type="org.apache.nifi.processors.standard.ListFile",
              configuration="""
                  Set 'Input Directory' to the directory that files should be ingested from
                  Set 'Input Directory Location' to 'Local'
                  """
          ),
          ProcessorConfiguration(
              processor_type="FetchFile",
              configuration="""
                Set the 'Filename' property to `${absolute.path}/${filename}`
                Set the 'Completion Strategy' to 'Delete'
              """
          )
      ])
class FetchFile(FlowFileTransform):

Note that in this case, we are able to specifically tell the user that the Filename property of FetchFile should be set to the value ${absolute.path}/${filename} because we know that the ListFile Processor will produce these attributes for us.

Requirements

The Python API requires that Python 3.9, 3.10, 3.11, or 3.12 is available on the machine hosting NiFi.

Each Processor may have its own list of requirements / dependencies. These are made available to the Processor by creating a separate environment for each Processor implementation (not for each instance of a Processor on the canvas). PyPI is then used to install these dependencies in that environment.

Processor Reloading

Often times, while developing a Processor, the easiest way to verify and modify its behavior is to make small tweaks and re-run the data. This is possible in NiFi without restarting. Once a Processor has been discovered and loaded, any changes to the Processor’s source code will take effect whenever the Processor is started again (or during certain other events, such as validation, while the Processor is stopped).

So we can easily update the source code for a Processor, start it, verify the results, stop the Processor, and update again as necessary. Or, more simply, click "Run Once" to verify the behavior; modify if necessary; and Run Once again. It is important to note, however, that if the Processor could not be successfully loaded the first time, NiFi may not monitor it for changes. Therefore, it’s important to ensure that the Processor is in a good working state before attempting to load it in NiFi. Otherwise, NiFi will need to be restarted in order to discover the Processor and load it again.

Because NiFi allows for multiple extension directories to be deployed, it might be helpful when developing a new extension to add the source directory where the extension is being developed as a NiFi extension source directory. This allows developers to develop processors using their IDE and allows NiFi to pickup any changes seamlessly as soon as the Processor is started.

Adding Third-Party Dependencies

Python based Processors can be a single module, or they can be bundled together as a Python package. How you specify third-party dependencies depends on how the Processor is packaged.

Package-level Dependencies

If one or more Processors are defined within a Python package, the package should define a requirements.txt file that declares all third-party dependencies that are necessary for any Processor in the package. The file structure will then typically look like this:

my-python-package/
│
├── __init__.py
│
├── ProcessorA.py
│
├── ProcessorB.py
│
└── requirements.txt

In this way, all of the requirements will be loaded from the requirements.txt file once for the package. There will be no need to load the dependencies once for ProcessorA and once for ProcessorB.

Processor-Level Dependencies

If your Processor is not a part of a Python package, its dependencies can be declared using the dependencies member of the ProcessorDetails inner class. This is a list of Strings that indicate the PyPI modules that the Processor depends on. The format is the same format expected by PyPI. This provides a convenience for declaring third-party dependencies without requiring that Processors be bundled into a package.

For example, to indicate that a Processor needs pandas installed, the implementation might look like this:

class PandasProcessor(FlowFileTransform):
    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
    class ProcessorDetails:
        version = '0.0.1-SNAPSHOT',
        dependencies = ['pandas']

However, it is often necessary to declare a specific version of a dependency. And it may also be necessary to define multiple dependencies. We can do that in this manner:

class PandasProcessor(FlowFileTransform):
    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
    class ProcessorDetails:
        version = '0.0.1-SNAPSHOT',
        dependencies = ['pandas', 'numpy==1.20.0']

Here, we accept any version of pandas (though the latest is preferred), and we require version 1.20.0 of numpy.

Dependency Isolation

The first time that a user creates a NiFi Processor of a given type, NiFi will create a separate Python env (venv) for the Processor. It will use pip to install the specified dependencies from PyPI only into the appropriate Python environment for that Processor. Therefore, dependencies of one Processor are not made available to another Processor.

Beyond that, dependencies of one version of a Processor are not made available to other versions of the Processor. So, for example, if we have two different versions of the same Processor made available, version 0.0.1 and version 0.0.2, the dependencies that are necessary for version 0.0.1 will not be made available to version 0.0.2 unless version 0.0.2 of the Processor also declares those dependencies.

Some environments, however, cannot make use of pip for package management. In an air-gapped environment, for example, or in environments with strict security policies in place, pip may not be available. In such a case, Python processors can be packaged using the NiFi ARchive (NAR) format. This is a .zip file with the following specific layout, and uses a filename extension of .nar:

my-nar-bundle.nar
+-- META-INF/
    +-- MANIFEST.MF
+-- NAR-INF/
    +-- bundled-dependencies/
        +-- dependency1
        +-- dependency2
        +-- ...
        +-- dependencyN
MyProcessor.py

Deploying a Developed Processor

Once a Processor has been developed, it can be made available in NiFi using one of two methods. For Processors that have been packaged as a NAR file, the NAR file should be copied to NiFi’s lib/ directory or configured extensions directory. For Processors that are not pre-packaged as a NAR, the Processor is deployed by copying the source of the Python extension to the $NIFI_HOME/python/extensions directory by default. The actual directory to look for extensions can be configured in nifi.properties via properties that have the prefix nifi.python.extensions.source.directory.. For example, by default, nifi.python.extensions.source.directory.default is set to ./python/extensions. However, additional paths may be added by replacing default in the property name with some other value.

Any .py file found in the directory will be parsed and examined in order to determine whether or not it is a valid NiFi Processor. In order to be found, the Processor must have a valid parent (FlowFileTransform, RecordTransform or FlowFileSource) and must have an inner class named Java with a implements = ['org.apache.nifi.python.processor.FlowFileTransform'] or implements = ['org.apache.nifi.python.processor.RecordFileTransform'] or implements = ['org.apache.nifi.python.processor.FlowFileSource']. This will allow NiFi to automatically discover the Processor.

Note, however, that if the Processor implementation is broken into multiple Python modules, those modules will not be made available by default. In order to package a Processor along with its modules, the Processor and any related module must be added to a directory that is directly below the Extensions directory. For example, if the WriteNumber.py file contains a NiFi Processor and also depends on the ProcessorUtil.py module, the directory structure would look like this:

NIFI_HOME/
    - python/
        - extensions/
            ProcessorA.py
            ProcessorB.py
            write-number/
                __init__.py
                ProcessorUtils.py
                WriteNumber.py

By packaging them together in a subdirectory, NiFi knows to expose the modules to one another. However, the ProcessorA module will have no access to the ProcessorUtils module. Only WriteNumber will have access to it.

Troubleshooting

The Python environments that are created for Processors are located in NiFi’s working directory, under the directory structure python/extensions/<module name>/<version>. So, for example, if we have a Processor MyProcessor that has version 0.0.1, its environment directory will be $NIFI_HOME/work/python/extensions/MyProcessor/0.0.1. This directory may be modified by updating the value of the nifi.python.working.directory property in nifi.properties.

For troubleshooting purposes, we may want to delete the environment directory. this can be done simply by stopping NiFi, deleting the appropriate environment directory, and restarting NiFi.

While you may delete the entire work directory while NiFi is stopped, doing so may result in NiFi taking significantly longer to startup the next time, as it must source all extensions' dependencies from PyPI, as well as expand all Java extensions' NAR files.

Debugging

It’s often helpful to attach a remote debugger to the Python process so that we can step through the code, examine variables, and execute code snippets, etc.

The method used will vary based on the IDE used. However, here we will look at how to enable remote debugging using VSCode. VSCode comes with a debugger named DebugPy.

It is important to note that every instance of a Processor and even each concurrent task in a Processor may end up using a separate Python process. Because of this, it is difficult to enable the Python process to listen for incoming connections because there may be many different processes. Instead, it is recommended to enable listening in VSCode and then allow the Processor itself to connect to the debugger.

To enable listening in VSCode, we must first create a launch.json launch configuration. The launch.json should have a listen section to tell it to listen on a particular port. Additionally, the pathMappings must be setup to indicate the local directory in which VSCode should find the Python code, and the remoteRoot which is the directory in which NiFi should find the Python code.

For example:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Remote Attach",
            "type": "python",
            "request": "attach",
            "listen": {
                "host": "localhost",
                "port": 5678
            },
            "pathMappings": [
                {
                    "localRoot": "${workspaceFolder}/nifi-python-test-extensions/src/main/resources/extensions",
                    "remoteRoot": "./python/extensions"
                }
            ],
            "justMyCode": true
        }
    ]
}

We can then launch this using VSCode.

Next, we must tell the Processor to connect to the debugger. This requires updating the Processor’s code.

Firstly, the DebugPy module must be added as a dependency. Then the Processor needs to connect to the debugger.

To enable remote debugging on Processors themselves, it is necessary to update the Processor’s code to implement something akin to:

class MyProcessor(FlowFileTransform):
    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
    class ProcessorDetails:
        version = '0.0.1-SNAPSHOT'
        dependencies = ['debugpy']

    def onScheduled(self, context):
        try:
            import debugpy
            debugpy.connect(6688)
        except e:
            self.logger.error("Failed to connect to python debug listener")

It is important to note, however, that the code available to VSCode must exactly match the code that NiFi is using in order to ensure that breakpoints line up correctly. As a result, the code should be updated in VSCode and then copied into NiFi’s directory. At that point, NiFi does not require a restart, but the Processor must be stopped and started again.

Now, when the Processor is scheduled, it will connect to the VSCode debugger, and you can set breakpoints in the VSCode in order to debug the Processor.