Apache NiFi - Stateless


The past several releases of Apache NiFi have made significant improvements to the Stateless NiFi engine. If you are not familiar with Stateless NiFi, then I would recommend reading this overview first.

This post will examine the differences between running a flow in traditional NiFi vs. Stateless NiFi.

Traditional NiFi

As an example, let’s assume there is a Kafka topic with CDC events and we want to consume the events and apply them to another relational database. This can be achieved with a simple flow containing ConsumeKafka_2_6 connected to PutDatabaseRecord.

In traditional NiFi, each node has a set of internal repositories that are stored on local disk. The Flow File Repository contains the state of each flow file, including its attributes and location in the flow, and the Content Repository stores the content of each flow file.

Each execution of a processor is given a reference to a session that acts like a transaction for operating on flow files. If all operations complete successfully and the session is committed, then all updates are persisted to NiFi’s repositories. In the event that NiFi is restarted, all data is preserved in the repositories and the flow will start processing from the last committed state.

Let’s consider how the example flow will execute in traditional NiFi…

First, ConsumeKafka_2_6 will poll Kafka for available records. Then it will use the session to create a flow file and write the content of the records to the output stream of the flow file. The processor will then commit the NiFi session, followed by committing the Kafka offsets. The flow file will then be transferred to PutDatabaseRecord. The overall sequence is summarized in the following diagram.

A key point here is the ordering of committing the NiFi session before committing the Kafka offsets. This provides an “at least once” guarantee by ensuring the data is persisted in NiFi before acknowledging the offsets. If committing the offsets fails, possibly due to a consumer rebalance, NiFi will consume those same offsets again and receive duplicate data. If the ordering was reversed, it would be possible for the offsets to be successfully committed, followed by a failure to commit the NiFi session, which would create data loss and be considered “at most once”.

A second key point is that there is purposely no coordination across processors, meaning that each processor succeeds or fails independently of the other processors in the flow. Once ConsumeKafka_2_6 successfully executes, the consumed data is now persisted in NiFi, regardless of whether PutDatabaseRecord succeeds.

Let’s look at how this same flow would execute in Stateless NiFi.

Stateless NiFi

Stateless NiFi adheres to the same NiFi API as traditional NiFi, which means it can run the same processors and flow definitions, it just provides a different implementation of the underlying engine.

The primary abstraction is a StatelessDataFlow which can be triggered to execute. Each execution of the flow produces a result that can be considered a success or failure. A failure can occur from a processor throwing an exception, or from explicitly routing flow files to a named “failure port”.

A key difference in Stateless NiFi is around committing the NiFi session. A new commit method was introduced to ProcessSession with the following signature:

void commitAsync(Runnable onSuccess);

This gives the session implementation control over when to execute the given callback. In traditional NiFi the session can execute the callback as the last step of commitAsync, which produces the same behavior we looked at earlier. The stateless NiFi session can hold the callback and execute it only when the entire flow has completed successfully.

Let’s consider how the example flow will execute in Stateless NiFi…

When the StatelessDataFlow is triggered, ConsumeKafka_2_6 begins executing the same as it would in traditional NiFi, by polling Kafka for records, creating a flow file, and writing the records to the output stream of the flow file. It then calls commitAsync to commit the NiFi session and passes in a callback for committing the offsets to Kafka, which in this case will be held until later.

The flow file is then transferred to PutDatabaseRecord which attempts to apply the event to the database. Let’s assume PutDatabaseRecord was successful, then the overall execution of the flow completes successfully. The stateless engine then acknowledges the result which executes any held callbacks, and thus commits the offsets to Kafka. The overall sequence is summarized in the following diagram.

A key point here is that the execution of the entire flow is being treated like a single transaction. If a failure were to occur at PutDatabaseRecord, the overall execution would be considered a failure, and the onSuccess callbacks from commitAsync would never get executed. In this case, that would mean the offsets were never committed to Kafka, and the entire flow can be tried again for the same records.

Another type of failure scenario would be if Stateless NiFi crashed in the middle of executing the flow. Since Stateless NiFi generally uses in-memory repositories, any data that was in the middle of processing would be gone. However, since the source processor had not yet acknowledged receiving that data (i.e. the onSuccess callback never got executed), it would pull the same data again on the next execution.

ExecuteStateless

Previously, the primary mechanism to use Stateless NiFi was through the nifi-stateless binary which launches a standalone process to execute the flow.

The 1.15.0 release introduced a new processor called ExecuteStateless which can be used to run the Stateless engine from within traditional NiFi. This allow you to manage the execution of the Stateless flow using the traditional NiFi UI, as well as connect the output of the Stateless flow to follow on processing in the traditional NiFi flow.

In order to use the ExecuteStateless processor, you would first use traditional NiFi to create a process group containing the flow you want to execute with the Stateless engine. You would then download the flow definition, or commit the flow to a NiFi Registry instance. From there, you would configure ExecuteStateless with the location of the flow definition.

For a more in depth look at ExecuteStateless, check out Mark Payne’s YouTube Video on “Kafka Exactly Once with NiFi”.


blog comments powered by Disqus