The Umbrella Security Labs research team is constantly processing terabytes of log files through dozens of Hadoop jobs in order to build the data we need for our predictive models. Some tools have proven to be invaluable time savers. The tool we use most often to write map/reduce jobs is Pig, a high-level language that makes it easy to describe common map/reduce workflows. The tool builds a standalone JAR file out of a script, that eventually runs as a standard Hadoop job..
The language itself, naturally called Pig Latin, is a succession of simple statements taking an input and producing an output. Inputs and outputs are structured data composed of bags, maps, tuples and scalar data.
A bag is a collection of tuples, and bags can be nested. This is one of the main things that make Pig very simple yet very powerful.
Columnar data, as stored in HDFS, usually doesn’t support any kind of nested data, even though Parquet looks extremely promising in this regard.
This set of (name, client IP, timestamp) rows can be stored like this on HDFS:
www.example.com 172.16.4.2 1365116918
www.example.com 10.69.42.21 1365116342
www.example.com 10.69.42.21 1365135730
www.example.com 10.69.42.21 1365132469
www.example.com 192.168.9.6 1365003704
cuteoverload.com 192.111.0.1 1365176541
cuteoverload.com 192.111.0.1 1365200469
But Pig can map these records to arbitrary schemas that can be a way more natural view of the same data:
{ ("www.example.com",{
("172.16.4.2", {1365116918, 1365116342}),
("10.69.42.21", {1365171304, 1365135730, 1365132469}),
("192.168.9.6", {1365003704})
}),
("cuteoverload.com",{
("192.111.0.1", {1365176541, 1365200469})
})
}
A bag can be arbitrary large, and can itself contain arbitrary large bags.
More importantly, a bag is spillable: its content doesn’t have to fit in memory.
While Pig will do its best in order to keep the content of a bag in memory before processing it, it also includes a sophisticated memory manager that can transparently spill the content to disk if necessary.
This is a very important feature for our M/R jobs, where some popular domain names like Google.com can be linked to huge amounts of data.
Not having to worry about it fitting in memory, and possibly have to rewrite our jobs in a different way just for a handful edge cases is a massive time saver.
Pig’s documentation and tutorials are excellent, so I will just walk through a couple typical operations to show how quickly these can be achieved. These are self-explanatory.
Filtering
d1_with_bad_sum = FILTER d1_with_bad_sum BY n > 1;
Aggregation
d1_with_bad_log_n_max = FOREACH (GROUP d1_with_bad_sum ALL) {
GENERATE LOG(MAX(d1_with_bad_sum.n)) AS log_n_max;
};
Joins
jgs = FOREACH (JOIN d1_with_bad_sum BY name, d1_sum BY name) {
GENERATE d1_with_bad_sum::name AS name,
((-100.0 * d1_with_bad_sum::p * LOG(d1_with_bad_sum::n)) /
(d1_sum::p * d1_with_bad_log_n_max.log_n_max)) AS score;
};
Sorting
jgs = ORDER jgs BY score ASC, name ASC;
Secondary sorting
pairs_r = FOREACH (GROUP raw BY client_ip) {
client_queries = FOREACH raw GENERATE ts, name;
client_queries = ORDER client_queries BY ts, name;
GENERATE client_queries;
};
In this last example, the content of a bag is sorted by client IP address, and for each client IP address, the list or queries is sorted by timestamp.
These statements are lazily evaluated, and transparently converted to sequences of mappers, reducers and combiners.
When the Pig Latin language is not enough
The simplicity of the Pig Latin language comes with some apparent limitations.
Once Pig has been added to your toolbox, you will probably feel the need for some additional functions, may it be for loading and saving data in an unsupported format, or for applying a specific operation to a collection of data.
Enter Pig User Defined Functions. Thanks to a very clean API, additional functions can easily be written in virtually any programming language implementation running on the JVM.
Our language of choice for Pig UDFs is Ruby, or rather JRuby which is a fantastic implementation of the Ruby language for the JVM.
Pig ships with first-class support for JRuby and the exposed API couldn’t be any simpler.
As an example, let’s teach Pig a new trick: transforming Labeled Tab-separated Values into data bags of key/value pairs (maps).
All we need is declare a new class that inherits PigUdf
. All public methods from this class will be immediately visible to Pig, the only requirement being to define the output schema.
require 'pigudf'
class LTSV < PigUdf
outputSchema "map[]"
def parse(str)
Hash[str.split("t").map{|f| f.split(":", 2)}]
end
end
Now, let’s use this new function from Pig:
REGISTER 'ltsv.rb' USING jruby AS LTSV;
raw = LOAD 'access.log' USING PigStorage('n') AS raw:chararray;
parsed = FOREACH x GENERATE LTSV.parse(raw) AS m;
Et voila! JRuby lets us easily extend Pig with new functions that can operate on scalar data as well as ginormous data bags that Pig will automagically spill to disk to keep memory usage below a high watermark.
Numerous third-party packages for Pig are freely available, our current favorites being LinkedIn’s DataFu and the GraphChi graph engine
Things we wish we knew when we started using Pig
Excited about Pig? Here are a few things that may not stand out when reading the documentation for the first time, but that turn out to be extremely useful.
– The DESCRIBE
and ILLUSTRATE
commands are your best friends for debugging a script. The ILLUSTRATE
command alone might be a good reason for chosing Pig over something else.
– pig -x loads and stores files locally instead of a distributed map/reduce job. For small data sets, the job is very likely to start and run orders of magnitude faster. This comes in handy when developing a new script.
– Besides a command-line tool, Pig is a Java package. Scripts can be executed from any language implementation running on the JVM. This is especially useful for iterative processes.
– Nested FOREACH
statements are very powerful.
– Compress map output and temporary files:
SET mapred.compress.map.output true;
SET mapred.map.output.compression.codec org.apache.hadoop.io.compress.SnappyCodec;
SET pig.tmpfilecompression true;
SET pig.tmpfilecompression.codec lzo;
– The content of the ~/.pigbootup file is read and executed at startup time. This is a good place to store default settings.
– Talking about settings, these will make your eyes bleed a bit less:
SET pig.pretty.print.schema true;
SET verbose false;
– Read the FAQ. Twice. Pay attention to the different JOIN strategies. Using the correct one can make a job run way faster.
– New features introduced in recent versions are disabled by default for a reason. They can provide significant speedups, but also lead to obscure bugs that will eventually drive you crazy. You’ve been warned.
– Real pigs can’t fly.
Pig is only one of the numerous tools we use daily in order to slice and dice data.
While we are still occasionally using Java and Rubydoop, the Pig+JRuby combo currently remains for us the most efficient way to quickly develop and test new algorithms, such as the ones we are using to discover thousands of suspicious domains every day.