One of the key challenges for OpenDNS (now part of Cisco) is handling a massive amount of DNS queries and simultaneously running classification models on them as fast as possible. Today, we’re going to talk about Avalanche, a real-time data processing framework currently used in our research cluster.
First, we have to run some numbers to evaluate the amplitude of our requirements and make smart architecture design decisions. Second, we will assess some similarities with other technical fields (such as quantitative trading in finance) that share very similar problematics, and see if we can find a common ground. Finally, we will expose some details and key elements of the Avalanche project.
Evaluating the traffic
Before jumping into any design or implementation, we need to take a look at the amount of traffic that OpenDNS sees during slow and peak hours every day. I decided to take a look at the log collection from one of our resolvers located in Amsterdam. This resolver is comprised of several machines that handle DNS queries, of which we’re going to consider only one (m1). It also sees various types of traffic, but here we will only focus on authoritative traffic (Authlogs) and recursive traffic (Querylogs). It is important to mention that the Amsterdam resolver, despite its significant amount of traffic, is not the biggest nor the smallest of our resolvers. Let’s take a look :
Amsterdam being at GMT + 1 hour, noon is the peak moment of the day. Whereas the traffic around midnight is typically very slow. The upper part of the table shows the amount of data contained in each log chunk. Important to note here: each resolver produces one chunk every 10 minutes. The lower part of the table exposes the same information converted to queries per second.
- For Authlogs, we observe a variation of 686.75 to 941.25 queries per second.
- For Querylogs, we observe a variation of 5525.26 to 10246.66 queries per second.
Put differently, authlogs peak at one message every 1.062 milliseconds, where querylogs peak at one message every 97 microseconds. These figures give a better understanding of the traffic volume and the precision level we’re dealing with here. Again, keep in mind that we have more resolvers all around the world all comprised of several machines averaging around these numbers.
Analogy with the finance world
Knowing now that we have to build a log processor that will handle hundreds of data centers and apply active classification/decision techniques to the microsecond level on each one, it’s pretty hard to not see a correlation with quantitative trading in finance. (Also known as algorithmic trading).
In fact, the similarity is pretty striking. We see volumes of queries; financial analyst firms see volumes of trades. We have to deal with time series; they do too. Our data is centered around domain names (strings), and they have ticker symbols. Our logs contain additional data about the queries (Authlogs and Querylogs have different information); they see additional info about each trade. Our job consists of analyzing traffic patterns to detect anomalies and apply enforcement decisions and actions to Internet traffic; they do the same thing to compute investment strategies. They run back testing on simulations to measure the efficacy of their strategies; we replay historical logs to do pretty much the same (confusion matrix, hit rates, etc.). And finally, financial firms use external indicators (e.g., sentiment analysis on the news), while we take into account third-party APIs to cross-check our results.
Obviously, there are differences also. We see a virtually infinite number of domains; they have a limited number of stock symbols. We apply classification algorithms on the domain name itself (e.g., DGAs, typo-squatting…), and for financial institutions, it’s not really relevant to do so. They also don’t really have visibility over the originators of a trade; we see the client IPs and ASNs in our logs. However, despite some important differences, it’s important to realize that a lot of modern companies that manage heavy traffic loads have to solve very similar problems, involving very similar infrastructure and will therefore go for comparable design decisions.
Choosing a robust messaging system
The foundation of our architecture relies on choosing the right messaging library. This will be a key factor to determine how well our data processing pipeline will perform. I will save you from all the details of a descriptive comparison between all the technology, but ZeroMQ is one of the best messaging middleware available, and it is very well known in the finance world. It also provides an amazing paradigm to build a distributed system with different message passing patterns. During my analysis, some important metrics caught my attention :
This benchmark was performed by sending one million messages using ZeroMQ. It shows the latency of each message on the Y axis. The graph on the left (pictured in black) shows the results with a standard Linux kernel. The one on the right shows the results obtained on a Real-Time kernel. The results also show an average latency of 23μs for the standard kernel, 33μs for the real-time one. This is in fact about three to four times faster than our Querylogs at peak time! This is looking very good so far.
There is a lot to be said about real-time systems in general and I’ll try not dwell too long on the subject here. It is however important to know that the term “real-time” is often misused. By real-time, most people mean “fast” and it is actually very far from the truth. Performing in real-time means that the process follows strict time constraints and observes strongly defined deadlines. We often differentiate between soft, firm, hard, and critical time constraints; these are all different levels of deadline observance that depends on what the real-time system is trying to accomplish. A program trying to play a video stream in real-time (soft) won’t be subjected to the same constraints as the program that triggers an airbag in a car (hard) or a cooling process of a nuclear reactor (critical). However, they all have a strong sense of how much time they have allowed to complete.
The graph on the right demonstrates this fact very well. The messages on average take more time to be sent (23μs to 33μs) but also never see any latency peak that would add unwanted non-determinism in our process.
That being said, in telecommunications we experience network-induced latency due to the type of protocol or cable that we are using. We typically refer to such processes as near real-time. The main take-away here is that this benchmark gives us a great perspective on the overall quality, reliability, and consistency of ZeroMQ. We also learn that we could move up to stronger real-time constraints if we need to. That could very well be the case if at some point we want to move our data processing pipeline in an embedded environment for example.
Designing our data processing pipeline
Let’s now discuss some implementation choices: The first key aspect is modularity. In my opinion, this is the only way such a system could work. It offers plenty of flexibility and code reuse, and it makes changes easier to deploy and calibrate. The central piece of the puzzle is the avalanche node:
An avalanche node can be seen as a plugin that reads messages from an input queue, applies some processing on the message data and outputs the result to an output queue. These queues can be handled in different ways. Sometimes it makes sense to keep queuing messages to make sure no data is lost, sometimes it’s smarter to opt for a fire and forget strategy where we only want to process the most recent messages. Frequently, we choose to queue the messages until a certain high water mark value. Once that size is reached, we start dropping messages. This all depends on the volumes of messages received in input, the computing performance of the plugin and how we want to forward the messages to the next node. Here are two ways you can write an avalanche plugin:
Typically, every node runs in its own thread. This allows the pipeline to take full advantage of highly parallel architectures containing multiple CPUs or cores. However our pipeline design can grow very fast, and we may end up running way too many threads compared to what the system could handle for an optimal configuration. This is where the need of grouping several nodes together on the same thread becomes important via the use of plugin racks. As you can see in the previous example, the first plugin only implements a process_message method and can be part of a sequence, while the second implements a full infinite loop and therefore has to run on its own. In other words, the first is rackable, while the second isn’t.
Now that we’ve learned how to create modular plugins. Let’s connect them! Avalanche uses JSON files to load pipeline definitions. For example, let’s create a very simple pipeline looking like this:
Very simple. Two plugins, the first one sending messages to the second. Here is how we would define it in the JSON file:
Each node is defined in the nodes JSON array with a unique ID, they are then connected in the edges section. They can also receive parameters in the attributes section. What you put in there completely depends on your plugin and the way it works. It’s just an easy way to pass external options to the plugin, which are then passed to the plugin constructor.
When your pipeline has reached some level of maturity, it may look more like this:
In this specific case, we have grouped some parts of the pipeline into racks. This allows us to decrease the number of threads used and gives us better clarity in the functional organization of the pipeline. In essence, a rack is a node running several plugins in sequence without any cycles. Here is how you would define a simple rack with the two previous plugins:
Once you have successfully implemented your plugins and pipeline definitions, all you have to do is start your avalanche pipeline with a simple command:
$ ./avalanche.py path/to/your/pipeline.json
We’ve now covered the first part of this data processing pipeline. Today, we are excited to share with you the core piece of our code so you can start using it and build your own custom implementations. In upcoming articles, we will share additional pieces such as custom traffic classifiers or methods to scale your pipeline on larger clusters in order to take your processing power to the next level.
Hope you enjoyed this blog post, feel free to share feedback and your own custom plugins!