Companies are increasing the amount of incoming data they need to process and store, so it can be overwhelming to find a scalable solution that can route this data to multiple sources efficiently. In an attempt to solve this issue, Netflix has released one of their internal tools as open source that helps centralize incoming application data to be sent to different data technologies.
Suro is a distributed pipeline that services the moving, aggregating, routing, and storing of data across multiple platforms. Netflix uses Suro for distributing data across their network of databases including numerous AWS EC2 instances and large Hadoop (HDFS) clusters. It is scalable, and the data transferred is highly available utilizing the async/sync communication protocols in the Suro client. The Suro server supports inputs from Thrift and Kafka, and outputs data to its built-in data sinks for the local file system, HDFS, S3, Kafka, and Elasticsearch.
In this tutorial, we dive into a basic client/server model of Suro. First, we will create a basic server that simply takes a message sent from the client using Thrift. Upon receiving the message through Thrift, the server then routes it to a local file using the LocalFileSink. Next we add a layer to incorporate sending messages to the ElasticSearchSink. This tutorial is meant to help readers better understand the basic functionalities of Suro and highlight the interesting feature of routing incoming data to multiple sources.
Requirements:
- Unix programming environment
- Java Runtime Environment (JRE) installed
- Basic knowledge of UNIX terminal commands
- git installed
- A Java IDE (e.g., IntelliJ, Eclipse)
- A basic Elasticsearch cluster/node setup (download and installation instructions here.)
Part 1:
Setting Up the Server:
We begin with setting up a basic Suro server.
- Clone the Suro GitHub repository
git clone https://github.com/Netflix/suro
- cd suro
- Make sure your repository is checked out to branch 0.2.9
git branch
- If not, checkout branch 0.2.9
git checkout 0.2.9
- Compile using Gradle commands:
./gradlew installApp
- cd suro-server
- Modify configuration files in the “conf” directory:
File: input.json
Description: This configuration file specifies the types of inputs Suro server will consume. Currently it only has two options: Thrift and Kafka. A Suro server may have multiple inputs. For this tutorial, we will only look at the Thrift input.
[ { "type": "thrift" } ]
File: routingmap.json:
Description: This configuration file specifies which routing keys point to which sinks. For the first part of the tutorial, we will leave this empty because we will only be using one sink. If a key is not specified in this file, the routing key will point to the default sink.
{ }
File: sink.json
Description: This configuration file specifies the sinks to which the Suro server will route. We will begin by using a LocalFileSink as our default. The outputDir path specifies where the sent messages will be saved.
{ "default": { "type": "local", "outputDir": "/tmp/suroserver/demo", "minPercentFreeDisk": 10, "writer": { "type": "text" } } }
Setting Up the Client:
We will now create a separate Suro client. This post demonstrates how to do this on IntelliJ, but other Java IDE’s should be similar.
1. Create a Maven project:
Open IntelliJ and create a new project. Select Maven on the sidebar and choose the Java SDK installed on your system. Click Next.
Next enter a Groupid and Artifactid. Click Next.
Finally, enter a project name and click Finish.
2. In the generated pom.xml file, add these repositories and dependencies.
<repositories> <repository> <id>github.release.repo</id> <url>https://raw.github.com/bulldog2011/bulldog-repo/master/repo/releases/</url> </repository> </repositories> <dependencies> <dependency> <groupId>com.leansoft</groupId> <artifactId>bigqueue</artifactId> <version>0.7.0</version> </dependency> <dependency> <groupId>com.netflix.suro</groupId> <artifactId>suro-client</artifactId> <version>0.2.9</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency> </dependencies>
3. Create a class named DemoSuroClient in /demo-suro-client/src/main/java:
4. Add the following to DemoSuroClient class to look like this:
import java.util.Properties;
import com.netflix.suro.ClientConfig; import com.netflix.suro.client.SuroClient; import com.netflix.suro.message.Message; public class DemoSuroClient { public static void main(String[] args) { final Properties clientProperties = new Properties(); clientProperties.setProperty(ClientConfig.LB_TYPE, "static"); clientProperties.setProperty(ClientConfig.LB_SERVER, "localhost:7101"); SuroClient client = new SuroClient(clientProperties); client.send(new Message("routingKey", "Test Message".getBytes())); client.shutdown(); } }
Explanation:
This is our basic Suro client. We create a client using the minimum required properties:
- the load balancer type
- the IP address and port number of the Suro server (you can find this information outputted on terminal when you run the server)
Next we send a message to the Suro server, which includes the routing key and the message payload in bytes.
Running/Testing Suro (LocalFileSink):
Time to run Suro!
Run Suro Server:
- In terminal, cd to the /suro/suro-server directory
- Enter:
java -cp "build/install/suro-server/lib/*" com.netflix.suro.SuroServer -m conf/routingmap.json -s conf/sink.json -i conf/input.json
Run Suro Client:
NOTE: You might see this exception outputted by the server on the terminal:
org.apache.thrift.transport.TTransportException: java.net.SocketException: Invalid argument
...
Ignore this message. It is a known issue. Click here for more info.
Verify the message was sent to the server:
- Go to http://localhost:7103/surosinkstat
You should see:
default:1 msgs, 12 bytes written, 0 have empty routing key. 0 failures of closing files
Stop running the client (Note: you may notice that your client doesn’t shut down. This is a known issue.)
Stop running server
- In terminal, hit “Ctrl-C”
Verify messages were received by the LocalFileSink
- cd /tmp/suroserver/demo
- cat *.done (* = arbitrary file name Suro generates for the file)
- You should see:
Test Message
So far in the tutorial, we managed to create a simple Suro client that can send a string message via Thrift to the Suro server. The server then routes the message to the LocalFileSink. We verified this by checking “suroinkstat” using the web browser interface and checking the physical file in the local directory. Take note that the routing key sent with the message is arbitrary since we only have one sink. Therefore all messages sent with any routing key will be routed to the default sink, that is the LocalFileSink.
Part 2:
Setting Up Suro to Use ElasticSearchSink:
We will now simulate the big data problem on a smaller scale. The next part of the tutorial adds the ElasticSearchSink layer to demonstrate how Suro is able to route messages to different data storage sources with its built in sinks.
Modify Suro Server configuration files:
File: routingmap.json
Description: In part 1 this file was empty, but now we need to add a message routing map because we have multiple sinks. We specify that messages with routing key “esKey” will be directed to the sink labeled “esSink.” Any other routing keys will be directed to the default sink.
{ "esKey": { "where": [ { "sink": "esSink" } ] } }
File: sink.json
First we will go over the parameters needed for the ElasticSearchSink configurations.
Important parameters: The above image is what our ElasticSearchSink configurations will look like. Here is a brief summary of the parameters:
1. Type: specifies the type of sink
2. Cluster.name: name of Elasticsearch cluster
3. AddressList: the IP addresses of Elasticsearch nodes to communicate with
4. IndexInfo: this encapsulates the required information Elasticsearch needs to index the given messages. Elasticsearch assumes messages are in JSON format.
5. Timestamp (required):
– specify the timestamp field of your JSON message
– in this example, the timestamp field is “ts”
6. IndexTypeMap (required):
– specifies which index and type to store the message in with the given routing key
– Field values:
– esKey: the routing key used to route messages to this sink
– index: the index name
– type: the index type
– for this tutorial, I chose to name the index “index” and type “type”
Other parameters:
7. clientName: arbitrary Elasticsearch client name
8. batchSize, batchTimeout, corePoolSize, maxPoolSize, jobQueueSize, queue4Sink: configurable settings for performance. For more information about these parameters, click here.
Adding the ElasticSearchSink configuration settings to the sink.json file should look like the code block below. Now we have two sinks: default (LocalFileSink) and esSink (ElasticSearchSink).
{ "default": { "type": "local", "outputDir": "/tmp/suroserver/demo", "minPercentFreeDisk": 10, "writer": { "type": "text" } }, "esSink": { "type": "elasticsearch" "clientName": "test-client", "addressList": [ "127.0.0.1:9300" ], "indexInfo": { "timestamp": { "field": "ts" }, "indexTypeMap": {"esKey":"index:type"}, "type": "default" }, "batchSize": 2500, "batchTimeout": 10000, "cluster.name": "elasticsearch", "corePoolSize": 4, "maxPoolSize": 4, "jobQueueSize": 0, "queue4Sink": { "capacity": 1000000, "type": "memory" } } }
Modify Suro Client:
In DemoSuroClient.java:
- Add code to send a basic JSON message to the ElasticSearchSink and LocalFileSink
- The JSON message has two fields:
- ts: the timestamp field that holds the current time (UTC) in milliseconds
- f1: arbitrary field with arbitrary value “v1”
import java.util.Map; import java.util.Properties;
import com.netflix.suro.ClientConfig; import com.netflix.suro.client.SuroClient; import com.netflix.suro.message.Message; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.suro.jackson.DefaultObjectMapper; import com.google.common.collect.ImmutableMap;
public class DemoSuroClient { public static void main(String[] args) throws JsonProcessingException { final Properties clientProperties = new Properties(); clientProperties.setProperty(ClientConfig.LB_TYPE, "static"); clientProperties.setProperty(ClientConfig.LB_SERVER, "localhost:7101"); ObjectMapper jsonMapper = new DefaultObjectMapper(); SuroClient client = new SuroClient(clientProperties); Map<String, Object> msg = new ImmutableMap.Builder<String, Object>() .put("ts", System.currentTimeMillis()) .put("f1", "v1") .build(); client.send(new Message("routingKey", jsonMapper.writeValueAsBytes(msg))); client.send(new Message("esKey", jsonMapper.writeValueAsBytes(msg))); client.shutdown(); } }
Running/Testing Suro (LocalFileSink and ElasticSearchSink)
Round two!
Make sure Elasticsearch is running. (If not, follow the Installation steps here.)
Run Suro Server: cd to the suro/suro-server directory and enter:
java -cp "build/install/suro-server/lib/*" com.netflix.suro.SuroServer -m conf/routingmap.json -s conf/sink.json -i conf/input.json
Run the Suro Client.
Verify messages were sent to the server. Go to http://localhost:7103/surosinkstat. You should see:
default:1 msgs, 30 bytes written, 0 have empty routing key. 0 failures of closing files esSink:indexed: 1, rejected: 0, parsing failed: 0
Stop running client and server.
Verify messages were received by the LocalFileSink:
-
cd /tmp/suroserver/demo
-
cat *.done
- *=arbitrary file name Suro generates for this file
- File should be different than the one outputted in part 1 of the tutorial
- “ts” will be different since it is the current time
You should see:
{"ts":1427841866680,"f1":"v1"}
Verify messages were received by the ElasticSearchSink (curl ‘localhost:9200/_cat/indices?v’). You should see:
health status index pri rep docs.count docs.deleted store.size pri.store.size green open index 1 1 1 0 2.7kb 2.7kb
Now search the index (curl -XGET ‘http://localhost:9200/index/_search?pretty’). The “_id” and “ts” will be different. You should see:
{ "took" : 1 "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "failed" : 0 }, "hits" : { "total" : 1, "max_score" : 1.0, "hits" : [ { "_index" : "index", "_type" : "type", "_id": "AUxyCAKYW6d2U9MFnCYn", "_score" : 1.0, "_source":{"ts":1427842261844,"f1":"v1"} } ] } }
We now have successfully added an ElasticSearchSink to our Suro setup. Taking a look at the above output from Elasticsearch, we can see that the document has index name “index” and type name “type” as specified in our ElasticSearchSink configuration settings. The ID field is auto-generated but can be set to a field in the document (see idFields section on this page). The source field matches the output returned by the LocalFileSink, so we have demonstrated it is possible to take a single input and route that information to multiple data sources.
Conclusion:
At OpenDNS, we are still experimenting with Suro; trying to find the best way to configure it to fit our needs. So far it looks promising. We believe Suro is capable of providing a scalable solution for routing data across multiple platforms without sacrificing performance. However, documentation of other users’ experiences and simple use-case examples are lacking online. We hope that our tutorial will help others save time trying to figure out how to use Suro and jump right into using it to solve big data needs. For more information, you can visit Netflix’s Suro wiki page and Netflix’s own blog post debuting Suro.