/ BIG-DATA, SPARK

Building a real-time streaming dashboard with Spark, Grafana, Chronograf and InfluxDB

When you work with the Internet of Things (IoT) or other real-time data sources, there is one things that keeps bothering you, and that’s a real-time visualization dashboard. So in this post, I will explain how you can set up a pipeline that allows you to visualize through 2 popular visualization tools: Chronograf and Grafana.

Note: This post will not cover setting up Grafana nor Chronograf due to the amount of work required. For more information about how to set these up, feel free to check this other post: Visualize metrics with Grafana and InfluxDB

Architecture

Let’s start by discussing what we will be creating and why we are creating it like this. The eventual architecture will be something that looks like this:

/assets/images/posts/real-time-streaming-dashboard/architecture.png

We can identify several components in here:

[Stream] EventHub

Description: EventHub Allows us to ingest and distribute events from one component to the other.

Why? We use an EventHub because we will be processing a lot of IoT events, which is easier if we have one single point where all our data comes in and that we can pipe to other components. Note that we are not using an IoT Hub which can also manage our IoT Devices, because we will be sending our data from a custom built script so there is no need for device management.

Alternatives?

  • Apache Kafka (#opensource, kind of like EventHub)
  • IoT Hub (Includes device management and things such as configuration pushing)

[Spark Engine] Databricks #opensource

Description: Databricks is a platform that allows you to process big data and streams through the use of the Spark platform.

Why? We want to be able to ingest our stream, modify it and stream it to our dashboard. A component such as this allows for a easy redistribution, while being able to modify our stream. We can do things such as multiple endpoint streaming, data windowing, …

Alternatives?

  • Azure Stream Analytics
  • CosmosDB (We use the change-feed here)

[Dashboard] Chronograf

Description: Chronograf is the interface component of InfluxData, created to make it easy to visualize and explore this data.

Why? For a real-time dashboard, we want to be able to see our events in less than 1 minute of delay, Chronograf does just this, it allows us to set a refresh interval of 5 seconds, which covers our need of showing this data near real-time. If we want an even lower refresh interval, we are looking towards socket solutions, which are quite hard to come by on this moment. Interesting to note here is that Chronograf requires us to use InfluxDB, which implicates a small lock-in being generated on this level. If you wish to avoid this, feel free to continue with Grafana as shown below.

Alternatives?

  • PowerBI
  • Kibana (ELK Stack)
  • Grafana

[Dashboard] Grafana

Description: Grafana is a tool for monitoring and analysing metrics through data sources such as Graphite, InfluxDB, Prometheus and many more.

Why? For a real-time dashboard, we want to be able to see our events in less than 1 minute of delay, Grafana does just this, it allows us to set a refresh interval of 5 seconds, which covers our need of showing this data near real-time. If we want an even lower refresh interval, we are looking towards socket solutions, which are quite hard to come by on this moment. The major difference with Chronograf is that Grafana integrates with more datasources on this moment, allowing us to evade a lock-in generated on this level.

Alternatives?

  • PowerBI
  • Kibana (ELK Stack)
  • Chronograf

Configuring our components

InfluxDB & Grafana

To configure our InfluxDB, Grafana and Chronograf containers, feel free to check out my other post at /howto-visualize-metrics-with-grafana-and-influxdb

Event Hub

To configure our Event Hub check out my blog post “Sending and Receiving events with Azure Event Hub”.

Once you created an Event Hub create a consumer group and your Policy Key with the “Listen” permission. More information for this you can find here

Make sure to note down the following details

  • PolicyName
  • PolicyKey
  • NamespaceName - This is the name of where all your EventHubs are located
  • EventHubName - This is the name of the EventHub you created
  • ConsumerGroupName - This is the consumer group that you created under your EventHub. The default name is: $Default

DataBricks

For our DataBricks, set up an account through the awesome documentation

Now we can log in on our Databricks portal and start creating our script that will ingest our data from EventHub. Start by creating your first notebook (I am using Scala here) and let’s see what we need to put in it to connect to our event stream and process the data towards our visualization tools.

Connecting to EventHub

Before we can connect to EventHub, we need to add a Maven library. For this, click “Workspace” -> “Shared” -> Under the arrow click “Create” -> “Library”. Now search after “azure-eventhubs-spark_2.11-2.3.2” and install and attach it to the cluster.

Once all this pre-setup is done, we can go to our notebook and connect to our EventHub with the following bit of code:

// Note: Before you can use these packages you need to add the Maven coordinate: com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.2
//       See https://docs.databricks.com/user-guide/libraries.html#maven-libraries for how to do so
//       More info: https://docs.databricks.com/spark/latest/structured-streaming/streaming-event-hubs.html
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition, EventHubsUtils }

// Connect to EventHub
val connectionString = ConnectionStringBuilder("Endpoint=<your_connection_string>")
  .setEventHubName("<your_eh_name>")
  .build();

val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream);

// eventHubs is a org.apache.spark.sql.DataFrame object
val eventHubs = spark.readStream
  .format("eventhubs")
  .options(eventHubsConf.toMap)
  .load();

If we now run this cell, we will see our variables initialized.

Performing ETL on our incoming EventHub data

To see the schema of the EventHub data coming in, we can utilize the printSchema method:

// Print schema of our stream
eventHubs.printSchema()

Here we see a field called body which contains our EventHub events data, but in a binary format. We thus have to parse this towards our original JSON format, together with correctly casting the variables.

import org.apache.spark.sql.types._ // https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/package-summary.html
import org.apache.spark.sql.functions._

// Our JSON Schema
val jsonSchema = new StructType()
  .add("sensor", StringType)
  .add("temperatureValue", StringType)
  .add("humidityValue", StringType)
  .add("createdAt", StringType)

// Convert our EventHub data, where the body contains our message and which we decode the JSON
val messages = eventHubs
  // Parse our columns from what EventHub gives us (which is the data we are sending, plus metadata such as offset, enqueueTime, ...)
  .withColumn("Offset", $"offset".cast(LongType))
  .withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
  .withColumn("Timestamp", $"enqueuedTime".cast(LongType))
  .withColumn("Body", $"body".cast(StringType))
  // Select them so we can play with them
  .select("Offset", "Time (readable)", "Timestamp", "Body")
  // Parse the "Body" column as a JSON Schema which we defined above
  .select(from_json($"Body", jsonSchema) as "sensors")
  // Now select the values from our JSON Structure and cast them manually to avoid problems
  .select(
    $"sensors.sensor".cast("string"),
    $"sensors.createdAt".cast("timestamp"), 
    $"sensors.temperatureValue".cast("double") as "tempVal", 
    $"sensors.humidityValue".cast("double") as "humVal"
  )

// Print the schema to know what we are working with
messages.printSchema()

Interesting to note here is the double casting being done, one in the JSON schema definition and one in the SQL select. This is done since when we do this in our JSON structure, we receive null values because of an incorrect parse.

As a debug method, we can now write our stream to memory and see if everything is coming in successfully:

// Display the stream
val messagesStream = messages.writeStream
  .outputMode("append")
  .format("memory")
  .option("truncate", false)
  .queryName("streamData")
  .start()

display(messages)

Sending our parsed events to InfluxDB

To connect to InfluxDB, we use a library called Chronicler which can be found at: https://github.com/fsanaulla/chronicler and https://github.com/fsanaulla/chronicler-spark.

We install it as we did before, by adding the libraries to our shared folder. The following libraries need to be installed:

com.github.fsanaulla:chronicler-akka-http
com.github.fsanaulla:chronicler-async-http
com.github.fsanaulla:chronicler-url-http
com.github.fsanaulla:chronicler-udp
com.github.fsanaulla:chronicler-macros
com.github.fsanaulla:chronicler-spark-structured-streaming

Once this is done, go back to your notebook and start adding your connection code:

// Create a connection to our InfluxDB
import com.github.fsanaulla.chronicler.async.Influx
import com.github.fsanaulla.chronicler.core.model.{InfluxConfig, InfluxCredentials}

import scala.util.{Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global

implicit lazy val influxConf: InfluxConfig = InfluxConfig("<host>", 8086, Some(InfluxCredentials("<user>", "<pass>")), gzipped = false)

Whereafter we can send our Events to InfluxDB by defining a writer that defines how we serialize our events in the tag and field format InfluxDB requires, and starting our stream with the .saveToInflux and .start.

// Save our stream to InfluxDB
//import com.github.fsanaulla.chronicler.spark.ds._ // DataFrame = DataSet[Row]
import com.github.fsanaulla.chronicler.spark.structured.streaming._
import com.github.fsanaulla.chronicler.macros.Macros
import com.github.fsanaulla.chronicler.macros.annotations.{field, tag}
import com.github.fsanaulla.chronicler.core.model.InfluxWriter
import com.github.fsanaulla.chronicler.core.model.Point

//case class Entity(@tag sensor: String, @field createdAt: Int, @field tempVal: Double, @field humVal: Double)
//implicit val wr: InfluxWriter[Entity] = Macros.writer[Entity]

implicit val wr: InfluxWriter[Row] = new InfluxWriter[Row] {
   override def write(o: Row): String = {
     val sb = StringBuilder.newBuilder
     
     // Query looks like: <tags> <fields> <timestamp RFC3339>
     sb.append(s"sensor=${o(0)}")
       .append(" ") 
       .append("tempVal=")
       .append(o(2))
       .append(",")
       .append("humVal=")
       .append(o(3))
     
     sb.toString()
   }
}

// Create our stream
val stream = messages
  .writeStream // Create a WriteStream
  .saveToInflux("myMetrics", "myValues")
  .start()

Creating our dashboards

We now have our stream running that is sending everything to our InfluxDB. Once we manipulate our dashboards a little bit, we can set them up like this:

Grafana

/assets/images/posts/real-time-streaming-dashboard/grafana-setup-4-humVal.png

/assets/images/posts/real-time-streaming-dashboard/grafana-setup-5-tempVal.png

Chronograf

/assets/images/posts/real-time-streaming-dashboard/chronograf-setup-3-tempVal.png

/assets/images/posts/real-time-streaming-dashboard/chronograf-setup-4-humVal.png

Conclusion

Once all of this is done, we can view our completed dashboard and have a result that looks like this:

/assets/images/posts/real-time-streaming-dashboard/grafana-setup-6-dashboard.png

/assets/images/posts/real-time-streaming-dashboard/chronograf-setup-5-dashboard.png

EventHub Sender Script

As a last point, I would like to include the script that I used to send mock data towards my EventHub:

const eventHubClient = require('azure-event-hubs').Client;
const config = require('./config.js');

// Init Client
const client = eventHubClient.fromConnectionString(config.getConnectionString(), config.eventHub);

// Create a sender
client.createSender()
.then((tx) => {
  setInterval(() => {
    const val = {
      temperature_sensor: {
        id: `sensor_${Math.floor((Math.random() * 10) + 1)}`,
        value: ((Math.random() * 41) - 10).toFixed(2),
        createdAt: new Date()
      },
      temp_value: ((Math.random() * 41) - 10).toFixed(2), 
    };

    tx.send(val);
    console.log(`Sent: ${JSON.stringify(val)}`);
  }, 1000);
})

with the config containing

const config = {
  eventHubUrl: "<your_url>", // no protocol included
  eventHubSharedAccessKeyName: "RootManageSharedAccessKey",
  eventHubSharedAccessKey: "<your_key>",
  // This is the EventHub name (go to eventhub -> Event Hubs -> <Names>)
  eventHub: "<your_eh>",
  eventHubPath: "<your_eh_namespace>",
};

module.exports = {
  ...config,
  getConnectionString: () => `Endpoint=sb://${config.eventHubUrl}/;SharedAccessKeyName=${config.eventHubSharedAccessKeyName};SharedAccessKey=${config.eventHubSharedAccessKey}`
}
xavier

Xavier Geerinck

Xavier works as a Cloud Solution Architect at Microsoft, helping its customer unlock the full potential of the cloud. Even though he is still considered a young graduate, he achieved his first success at the age 16, by creating and selling his first startup. He then took this knowledge to create and help more startups in different markets such as technology, social media, philanthropy and home care. While in the meantime gaining more enterprise insights at renowned enterprises such as Nokia, Cisco and now Microsoft.

Read More