Share

Follow

Deploying and running an In-Stream Process service as a “developer sandbox”

Grid Dynamics
Feb 01, 2017

In the previous blog post we went through the details of how to set up the Docker infrastructure with Mesos and Marathon, and how to bootstrap the environment to get it ready to host application services. Now it is time to deploy some applications.

Our blog’s purpose is not only to show you a final solution to a problem but also how to get to that solution. So in this post we’ll start by creating a lightweight version of the development environment, a “developer’s sandbox.” This is our chosen process for the following reasons:

  • The sandbox can be easily reproduced, even manually, and will give hands-on insights into how dockerized applications are managed even when working with high-abstract solutions
  • The sandbox requires minimal resources, so this setup can be reproduced even on a mid-spec modern laptop
  • The sandbox lets developers implement custom logic and debug it without interfering with the production environment or even with other applications on the computer where we have it installed

As you may recall, the In-Stream Processing (ISP) service is a platform for running various real-time analytics applications. You can read more about the service itself in our previous blog posts 1-4 in this series. We also provide a real-time analytics reference application for Twitter stream sentiment analysis of popular movie reviews that runs on our ISP platform. The reference application is described in detail in a 7-part blog series called the Data Science Kitchen. You can even visualize the Twitter sentiments in action and play with the configuration parameters on this web portal.

Now it’s time to provide you with step-by-step instructions on how to bring all this to life. Specifically, we’ll break the process into three steps: 

  • Step 1: Define our deployment-ready components (ZooKeeper, Cassandra, Redis, Kafka, Spark streaming), that make up our platform for In-Stream Processing applications.
  • Step 2: Define the deployment-ready analytics application that will run on the ISP platform. In this case, it’s the real-time Twitter sentiment analysis of movie reviews application.
  • Step 3: Run the resulting application definitions in the correct order and then watch things work!

Step 1: Define deployment-ready ISP components

We will not use clusterization in our very first example because we want to keep the sandbox topology as simple as possible. Our simplified ISP platform looks like this: Simplified schema of In-Stream Processing platform.

The easiest way to get the system up and running is to configure all application communications between services through pre-defined ports on a load balancer. It’s very simple to implement, and will support multiple re-launches of the environment.

All we have to do is pre-assign load balancer ports with specific, hard-coded values for each service, and fill the container definitions with those values.

Everything you need to deploy each of the platform’s components is listed below.

Component: Zookeeper

This is the tool we will use across our environment to make sure all other components are clustered correctly and stay in sync. Even in the case of non-clusterized deployment, we still need it for Kafka to operate.

Brief summary:

Docker Image: wurstmeister/zookeeper
Dependencies: none
Ports to expose: 2181

That is one of the simplest components in our list — at a bare minimum if we are just about to start using Zookeeper, all we need to do is to run the open source image.

The full container definition (with some comments):

{
   "id": "/sandbox-1/zookeeper",

In order to make navigation easier we will group our sandbox environments under ‘folders’ in Marathon.

   "cpus": 0.25,
   "mem": 256,
   "disk": 256,

In the case of sandbox use, Zookeeper does not require a lot of resources

   "instances": 1,
   "container": {
      "type": "DOCKER",
      "docker": {
         "image": "wurstmeister/zookeeper",

We will need to tell Marathon that we want to expose the Zookeeper client port to the outside world, so — let’s create the port mapping:

         "network": "BRIDGE",
         "portMappings": [
            {
               "containerPort": 2181,
               "hostPort": 0,
               "servicePort": 10010,
               "protocol": "tcp"
            }
         ]

Usually, this is where you can find "servicePort": 0 which means we won’t be pre-allocating a port on Marathon’s load balancer (haproxy), but will let the system get the first unused one from the pool. This works well if you have a management system on top which takes care of wiring the applications together or they are using a service discovery solution. We will start with the simplest “old school” way first and evolve this approach over time.

      }
   }
}

Container definition can be also downloaded directly from this GitHub repository.

Component: Cassandra

Our operational data store. This is the component which will use the additional features offered by Marathon, namely, host constraints, node label constraints, and persistent volumes when deployed in a production-like configuration.

Brief summary:

Docker Image: pocklet/cassandra
Ports to expose: 9042, 9160
Dependencies: none

In order for Cassandra to work properly with our application, we will also need to apply initial schema files to create a keyspace and table structure. They’re in the repository here 

An excellent trick is to make Marathon do this for us every time a container is instantiated — this saves time during sandbox deployment. In order to do so, let’s call the script which will do it for us as part of startup process:

"cmd": "cd ${MESOS_SANDBOX}/cassandra-schema && ./apply_schema.sh & start",

Let’s not forget to download this script and these schema files into our container prior to running them. We have uploaded this schema tarball to Amazon S3 for your convenience:

"fetch": [
   {
      "uri": "https://s3-us-west-1.amazonaws.com/streaming-artifacts/cassandra-schema.tar.gz",
      "extract": true,
      "executable": false,
      "cache": false
   }
],

Full container definition

Component: Redis 

Our super-fast data store for the reference data, including dictionaries and calibration values used by the heart of in-stream processing — the mathematical models deployed within our Spark streaming applications.

Similar to Zookeeper, in its minimum configuration Redis “just works” out of the box with the basic community image:

Brief summary:

Docker Image: redis
Ports to expose : 6379

Container definition

Component: Kafka

Our message bus which delivers a real-time feed of tweets we fetched from the Twitter API to the Spark application.

Brief summary:

Image: wurstmeister/kafka
Dependencies: zookeeper
Ports to expose: 9092

The only 2 “custom” entries in the Marathon definition will be the Zookeeper endpoint and a list of topics to be created at the start. However, in order to operate properly, Kafka also needs its own external address and port:

"portMappings": [
   {
      "containerPort": 9092,
      "hostPort": 0,
      "servicePort": 10013,
      "protocol": "tcp"
   }
]
...

"env": {
   "KAFKA_ZOOKEEPER_CONNECT": "<HAPROXY_HOST>:10010",
   "KAFKA_CREATE_TOPICS": "tweets_topic:1:1",
   "KAFKA_ADVERTISED_HOST_NAME": "<HAPROXY_HOST>",
   "KAFKA_ADVERTISED_PORT": "10013"
}
Full container definition

Component: Spark Streaming

This is the heart of our real-time data processing facility, where the analytics magic happens.
Since we decided to discuss the generic framework and particular streaming applications separately, let’s start with the bare bones

Brief summary:

Docker Image: sequenceiq/spark:1.6.0
Dependencies: none
Ports: 8088, 8042, 4040, 2122

Container definition

Once we have all the containers we can fire them in the required order, wire the endpoints, and see it all working, ready to run our code.

Step 2: Define a deployment-ready analytics application that runs on an ISP platform. In our case, it’s the real-time Twitter sentiment analysis of social movie reviews

Once the ISP platform is ready to go, we can add real-time analytics applications that actually perform the business computations we wanted in the first place. Here is how to deploy our Twitter sentiment analysis application for social movie reviews. 

Operational diagram for In-Stream Processing platform.

Component: Spark streaming application

We will deploy our code inside a container running Spark. It’s easy with Marathon; all we do is add instructions to fetch the artifact with our application. We also make sure it will be fired up inside the  container every time it is started:

"cmd": "cd ${MESOS_SANDBOX}/in-stream-tweets-analyzer && bash ./streaming-runner.sh", ... "fetch": [
   {
      "uri": "https://s3-us-west-1.amazonaws.com/streaming-artifacts/in-stream-tweets-analyzer-latest.tar.gz",
      "executable": false,
      "cache": false
   }
]

We also need to provide our application with additional endpoints to work with — Kafka to get raw data from, Redis to fetch dictionaries, and Cassandra to store the processing results.
The easiest and most common way to do this in the dockerized world is to pass them as environment variables and make sure they are picked up by the application itself:

"env": {
   "KAFKA_BROKER_LIST": "<HAPROXY_HOST>:10013",
   "CASSANDRA_HOST": "<HAPROXY_HOST>",
   "CASSANDRA_PORT": "10011",
   "REDIS_HOST": "<HAPROXY_HOST>",
   "REDIS_PORT": "10012"
},

Brief summary:

Image: sequenceiq/spark:1.6.0
Dependencies: Redis, Cassandra, Kafka
Ports: 8088, 8042, 4040, 2122

Full container definition

Component: Twitter consumer application

The Twitter consumer application needs to grab data from the Twitter streaming API and feed it to the Spark application for processing.This Twitter consumer application is written in Scala. To launch it, all we need to do is to get an open source Java image, specify what application to run, and provide the environment variables with API access tokens and a Kafka endpoint:

"cmd": "bash ${MESOS_SANDBOX}/twitter-consumer-runner.sh --movie \"<movie name>\"", ... "env": {
   "TWITTER_ACCESS_TOKEN": "<>",
   "TWITTER_CONSUMER_KEY": "<>",
   "TWITTER_CONSUMER_SECRET": "<>",
   "TWITTER_ACCESS_TOKEN_SECRET": "<>",
   "KAFKA_BROKER_LIST": "<HAPROXY_HOST>:10013"
},


"fetch": [
   {
      "uri": "https://s3-us-west-1.amazonaws.com/streaming-artifacts/twitter-consumer.tar.gz",
      "extract": true,
      "executable": false,
      "cache": false
   }
]

Brief summary:

Docker Image: java:8
Dependencies: Kafka
Ports: none

Full container definition

Component: web UI

The last element of the application is a web UI to visualize the results of the sentiment analysis. The web UI is a NodeJS application that gets its data from Cassandra.

"cmd": "cd ${MESOS_SANDBOX}/webclient && npm install && WEB_CLIENT_PORT=3005 node server/server.js", ... "env": {
   "CASSANDRA_PORT": "10011",
   "CASSANDRA_HOST": "<HAPROXY_HOST>"
},


"fetch": [
   {
      "uri": "https://s3-us-west-1.amazonaws.com/streaming-artifacts/ui.tar.gz",
      "extract": true,
      "executable": false,
      "cache": false
   }
]

Brief summary:

Docker Image: node
Dependencies: Cassandra
Ports: 3005

Full container definition

Also, in order to quickly access the list of movies we’re keeping an eye on, the application relies on records about them in the Cassandra table called ‘movies’ within our keyspace.

So, in order to see the result, we will need to get back to Cassandra and insert corresponding data about every movie for which we have an instance of the Twitter consumer application running:

On the host where Cassandra container was spawned:

$ docker exec -it <cassandra container id> cqlsh
cqlsh> use twitter_sentiment ;
cqlsh:twitter_sentiment> INSERT INTO movies (title, rating, release) VALUES (‘<movie name>’, <imdb score>, ‘<movie release date>’);

Step 3: Running our very first environment!

Since we chose to go with pre-defined ports on a load-balancer for our developer-style environment, all we need to do is to fire our components in the correct order:

  • Zookeeper, Redis, Cassandra
  • Kafka, Web UI
  • Spark Streaming application, Twitter consumer (s)

… and here we go. Once fired, we have something to play with!
* Don’t forget about adding movie data to Cassandra. For our demo application it’s essential to have the ability to show a list of movies and fetch their data.

Of course, our environment has limitations which restrict it to the “sandbox” role:

  • Components are running in non-clustered configurations
  • Endpoints are hard-coded
  • There are manual steps in the process of bringing it to life

We are going to address these limitations in future blog posts. Indeed, each item in this three-item list really deserves an entire blog post of its own . As usual, we will provide you with the rationale for the high-level approach chosen by our team, describe its goals and limitations, and later we’ll offer the source code and a one-click “magic button” to get the entire platform to come up on AWS in minutes.

So please stay tuned. More information is coming soon!


Big DataDeploying In-Stream Processing

Leave us a comment, we would love to know what you think

Love Tech? Keep in touch with new posts by Grid Dynamics Subscribe

Get in touch 650-523-5000

+1 (650)523-5000
Privacy Policy GDPR Statement Terms of Use
© 2006-2018 GridDynamics
Subscribe to updates from the Grid Dynamics Blog
* indicates required
Choose the technology topics you would like to subscribe to:
Choose the industry channels you would like to subscribe to: