Understanding Naiad: A timely dataflow system for batch and stream processing

A large scale distributed system that can support cyclic dataflows.

Back in 2013 and even now, every distributed programming model was tightly coupled with the engine that executed this model and so interoperability was an issue. It is an issue even today but not that much — thanks to systems like Naiad and Spark. Before Naiad, we had systems like MapReduce for batch processing, time stream for stream processing, and Giraffe for graph/vertex processing.

Naiad is a distributed cyclic dataflow system that offers both batch and stream processing on top of the same engine. It introduced a new computational model: a timely dataflow that enables Naiad to process cyclic dataflows. This helps in building applications on the lines of graph analytics. It has an efficient mechanism of incremental computations like that of the Page-Rank algorithm. Naiad demonstrated high throughput and low latency.

I believe understanding Naiad would strengthen the concepts of generalized distributed computing platforms and hence, writing this blog post.

Why do we need incremental and streaming workflows?

The PageRank algorithm reads text files to parse URLs and then counts each one of them to rank URL pages. Usually, Apache Spark is used for this purpose but we needed a more refined way in which input data can be updated.

If the input data is updated, in Spark, we would need to rerun the entire thing. However, Spark has support for streaming but that does not yield high throughput as Naiad, simply because the computational model is not supported for streaming data. Note: I am not considering Structured Streaming of Spark that could perform this job, I am considering the base research paper of vanilla Spark introduced to us in ‘13.

In Spark, we can have incremental computations like that of the PageRank algorithm but we cannot have cycles in our graphs. Spark uses Directed Acyclic Graphs(DAG) for computation. Even when we use loops, Spark unfolds it into DAGs by adding new RDDs [2]. RDDs are Resilient Distributed Datasets that are read-only data structures. This makes Spark create new RDDs at each step of the computation and so the size of the DAG in Spark increases, very much.

In Naiad, we can have arbitrary cycles in our dataflow.

Dataflow graph in Naiad [1]

As you can see, Loop Context in the above graph is a cycle with an ingress(I) and an egress(E) node along with a feedback (F) node.

Loop Contexts form a tree of hierarchies where the Ingress node is the root and data going through ingress node is considered going down the tree while taking an exit from this loop via the Egress node is going up the tree.

On the whole, Naiad is special because we can have complex dataflow graphs.

Naiad will let us run asynchronous jobs without any coordination and whenever it is necessary, it will also have us run synchronous jobs to produce consistent results.

How do dataflow graphs work?

Dataflow graphs use logical timestamps [3]. These logical timestamps contain epoch and loop counter. They are used to track the computation, loop number of the input data.

The loop counter ends up with a number that would be the depth of the context tree explained above. In theory, we can have as many loop counters as needed but as per the paper, not more than a couple of them are needed in practice.

Naiad has a distributed protocol for tracking progress in distributed graphs. Each vertex in the Naiad graph receives a notification on timestamp t and pushes the data ahead. It is assured that once a notification is received, no data before t will ever arrive or be processed i.e. no vertex before the receiver will ever receive notification of timestamp earlier than or equal to t: “until t” is guaranteed. This assurance is helpful while developing applications over Naiad.

Progress Tracking in Naiad

We shall see two low-level APIs of Naiad: ONRECV and ONNOTIFY.

ONRECV has three arguments: edge(between two vertices u and v), message, and timestamp. This function makes vertex v receive data from vertex u.

ONNOTIFY has one argument: timestamp(t) where vertex v receives a notification at time=t.

We can have multiple epochs flying around in the loop at a given time and that is exactly how Naiad offers parallel synchronous data processing. Parallel because of multiple epochs, synchronous because of the epoch used: we could have conditions like if t%5==0, deliver a notification to v; if t%2 ==0: deliver a notification to u

Spark v/s Naiad: Fault Tolerance

One drawback of this computation model is that the fault tolerance of Naiad is slow. It uses synchronous checkpoints where it stops the current execution, takes the checkpoint, and then resumes. This latency is visible to the end-user. The only other alternative is to implement logging but that would significantly increase the disk I/O.

In Spark, we can see a better fault tolerance mechanism where it implements lineage-based fault tolerance. This is not possible in Naiad as Naiad has fine-grained updates and not idempotent RDDs. In Naiad, the operator outputs individual records whereas, in Spark, it outputs RDDs and so, in Naiad, lineage to maintain becomes very huge.

Conclusion

Taking a look at performance evaluation [1], we can conclude that Naiad’s performance makes it a powerful general-purpose programming platform and model for both batch and streaming computation. The biggest advantage of using Naiad is to be able to decouple the execution engine with the distributed systems protocol.

References

[1] Naiad: A Timely Dataflow System, Microsoft Research Silicon Valley

[2] https://spark.apache.org/docs/latest/rdd-programming-guide.html

[3] https://towardsdatascience.com/understanding-lamport-timestamps-with-pythons-multiprocessing-library-12a6427881c6

Code + Data.