{"info":{"_postman_id":"c5e9ec44-1e49-4561-8b40-9c8636838f08","name":"FIWARE Real-time Processing (Spark)","description":"<html><head></head><body><p><a href=\"https://github.com/FIWARE/catalogue/blob/master/processing/README.md\"><img src=\"https://nexus.lab.fiware.org/static/badges/chapters/core.svg\" alt=\"FIWARE Core Context Management\"></a>\n<a href=\"https://fiware-ges.github.io/orion/api/v2/stable/\"><img src=\"https://img.shields.io/badge/NGSI-v2-5dc0cf.svg\" alt=\"NGSI v2\"></a></p>\n<p>This tutorial is an introduction to the <a href=\"http://fiware-cosmos-spark.rtfd.io\">FIWARE Cosmos Orion Spark Connector</a>, which\nenables easier Big Data analysis over context, integrated with one of the most popular BigData platforms:\n<a href=\"https://spark.apache.org/\">Apache Spark</a>. Apache Spark is a framework and distributed processing engine for stateful\ncomputations over unbounded and bounded data streams. Spark has been designed to run in all common cluster environments,\nperform computations at in-memory speed and at any scale.</p>\n<p>The <code>docker-compose</code> files for this tutorial can be found on GitHub: </p>\n<p><img src=\"https://fiware.github.io/tutorials.Big-Data-Spark/icon/GitHub-Mark-32px.png\" alt=\"GitHub\"> <a href=\"https://github.com/FIWARE/tutorials.Big-Data-Spark\">https://github.com/FIWARE/tutorials.Big-Data-Spark</a></p>\n<h1 id=\"real-time-processing-and-big-data-analysis\">Real-time Processing and Big Data Analysis</h1>\n<blockquote>\n<p>\"You have to find what sparks a light in you so that you in your own way can illuminate the world.\"</p>\n<p>— Oprah Winfrey</p>\n</blockquote>\n<p>Smart solutions based on FIWARE are architecturally designed around microservices. They are therefore are designed to\nscale-up from simple applications (such as the Supermarket tutorial) through to city-wide installations base on a large\narray of IoT sensors and other context data providers.</p>\n<p>The massive amount of data involved eventually becomes too much for a single machine to analyse, process and store, and\ntherefore the work must be delegated to additional distributed services. These distributed systems form the basis of\nso-called <strong>Big Data Analysis</strong>. The distribution of tasks allows developers to be able to extract insights from huge\ndata sets which would be too complex to be dealt with using traditional methods. and uncover hidden patterns and\ncorrelations.</p>\n<p>As we have seen, context data is core to any Smart Solution, and the Context Broker is able to monitor changes of state\nand raise <a href=\"https://github.com/Fiware/tutorials.Subscriptions\">subscription events</a> as the context changes. For smaller\ninstallations, each subscription event can be processed one-by-one by a single receiving endpoint, however as the system\ngrows, another technique will be required to avoid overwhelming the listener, potentially blocking resources and missing\nupdates.</p>\n<p><strong>Apache Spark</strong> is an open-source distributed general-purpose cluster-computing framework. It provides an interface for\nprogramming entire clusters with implicit data parallelism and fault tolerance. The <strong>Cosmos Spark</strong> connector allows\ndevelopers write custom business logic to listen for context data subscription events and then process the flow of the\ncontext data. Spark is able to delegate these actions to other workers where they will be acted upon either in\nsequentially or in parallel as required. The data flow processing itself can be arbitrarily complex.</p>\n<p>Obviously, in reality, our existing Supermarket scenario is far too small to require the use of a Big Data solution, but\nwill serve as a basis for demonstrating the type of real-time processing which may be required in a larger solution\nwhich is processing a continuous stream of context-data events.</p>\n<h1 id=\"architecture\">Architecture</h1>\n<p>This application builds on the components and dummy IoT devices created in\n<a href=\"https://github.com/FIWARE/tutorials.IoT-Agent/\">previous tutorials</a>. It will make use of three FIWARE components - the\n<a href=\"https://fiware-orion.readthedocs.io/en/latest/\">Orion Context Broker</a>, the\n<a href=\"https://fiware-iotagent-ul.readthedocs.io/en/latest/\">IoT Agent for Ultralight 2.0</a>, and the\n<a href=\"https://fiware-cosmos-spark.readthedocs.io/en/latest/\">Cosmos Orion Spark Connector</a> for connecting Orion to an\n<a href=\"https://spark.apache.org/docs/latest/cluster-overview.html\">Apache Spark cluster</a>. The Spark cluster itself will\nconsist of a single <strong>Cluster Manager</strong> <em>master</em> to coordinate execution and some <strong>Worker Nodes</strong> <em>worker</em> to execute\nthe tasks.</p>\n<p>Both the Orion Context Broker and the IoT Agent rely on open source <a href=\"https://www.mongodb.com/\">MongoDB</a> technology to\nkeep persistence of the information they hold. We will also be using the dummy IoT devices created in the\n<a href=\"https://github.com/FIWARE/tutorials.IoT-Agent/\">previous tutorial</a>.</p>\n<p>Therefore the overall architecture will consist of the following elements:</p>\n<ul>\n<li>Two <strong>FIWARE Generic Enablers</strong> as independent microservices:<ul>\n<li>The FIWARE <a href=\"https://fiware-orion.readthedocs.io/en/latest/\">Orion Context Broker</a> which will receive requests\nusing <a href=\"https://fiware.github.io/specifications/OpenAPI/ngsiv2\">NGSI</a></li>\n<li>The FIWARE <a href=\"https://fiware-iotagent-ul.readthedocs.io/en/latest/\">IoT Agent for Ultralight 2.0</a> which will\nreceive northbound measurements from the dummy IoT devices in\n<a href=\"https://fiware-iotagent-ul.readthedocs.io/en/latest/usermanual/index.html#user-programmers-manual\">Ultralight 2.0</a>\nformat and convert them to <a href=\"https://fiware.github.io/specifications/OpenAPI/ngsiv2\">NGSI</a> requests for the\ncontext broker to alter the state of the context entities</li>\n</ul>\n</li>\n<li>An <a href=\"https://spark.apache.org/docs/latest/cluster-overview.html\">Apache Spark cluster</a> consisting of a single\n<strong>ClusterManager</strong> and <strong>Worker Nodes</strong><ul>\n<li>The FIWARE <a href=\"https://fiware-cosmos-spark.readthedocs.io/en/latest/\">Cosmos Orion Spark Connector</a> will be\ndeployed as part of the dataflow which will subscribe to context changes and make operations on them in\nreal-time</li>\n</ul>\n</li>\n<li>One <a href=\"https://www.mongodb.com/\">MongoDB</a> <strong>database</strong> :<ul>\n<li>Used by the <strong>Orion Context Broker</strong> to hold context data information such as data entities, subscriptions and\nregistrations</li>\n<li>Used by the <strong>IoT Agent</strong> to hold device information such as device URLs and Keys</li>\n</ul>\n</li>\n<li>Three <strong>Context Providers</strong>:<ul>\n<li>A webserver acting as set of <a href=\"https://github.com/FIWARE/tutorials.IoT-Sensors\">dummy IoT devices</a> using the\n<a href=\"https://fiware-iotagent-ul.readthedocs.io/en/latest/usermanual/index.html#user-programmers-manual\">Ultralight 2.0</a>\nprotocol running over HTTP.</li>\n<li>The <strong>Stock Management Frontend</strong> is not used in this tutorial. It does the following:<ul>\n<li>Display store information and allow users to interact with the dummy IoT devices</li>\n<li>Show which products can be bought at each store</li>\n<li>Allow users to \"buy\" products and reduce the stock count.</li>\n</ul>\n</li>\n<li>The <strong>Context Provider NGSI</strong> proxy is not used in this tutorial. It does the following:<ul>\n<li>receive requests using <a href=\"https://fiware.github.io/specifications/OpenAPI/ngsiv2\">NGSI</a></li>\n<li>makes requests to publicly available data sources using their own APIs in a proprietary format</li>\n<li>returns context data back to the Orion Context Broker in\n<a href=\"https://fiware.github.io/specifications/OpenAPI/ngsiv2\">NGSI</a> format.</li>\n</ul>\n</li>\n</ul>\n</li>\n</ul>\n<p>The overall architecture can be seen below:</p>\n<p><img src=\"https://fiware.github.io/tutorials.Big-Data-Spark/img/Tutorial%20FIWARE%20Spark.png\" alt=\"\"></p>\n<h2 id=\"spark-cluster-configuration\">Spark Cluster Configuration</h2>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-yaml\">spark-master:\n    image: bde2020/spark-master:2.4.5-hadoop2.7\n    container_name: spark-master\n    expose:\n        - \"8080\"\n        - \"9001\"\n    ports:\n        - \"8080:8080\"\n        - \"7077:7077\"\n        - \"9001:9001\"\n    environment:\n        - INIT_DAEMON_STEP=setup_spark\n        - \"constraint:node==spark-master\"\n</code></pre>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-yaml\">spark-worker-1:\n    image: bde2020/spark-worker:2.4.5-hadoop2.7\n    container_name: spark-worker-1\n    depends_on:\n        - spark-master\n    ports:\n        - \"8081:8081\"\n    environment:\n        - \"SPARK_MASTER=spark://spark-master:7077\"\n        - \"constraint:node==spark-master\"\n</code></pre>\n<p>The <code>spark-master</code> container is listening on three ports:</p>\n<ul>\n<li>Port <code>8080</code> is exposed so we can see the web frontend of the Apache Spark-Master Dashboard.</li>\n<li>Port <code>7070</code> is used for internal communications.</li>\n</ul>\n<p>The <code>spark-worker-1</code> container is listening on one port:</p>\n<ul>\n<li>Port <code>9001</code> is exposed so that the installation can receive context data subscriptions.</li>\n<li>Ports <code>8081</code> is exposed so we can see the web frontend of the Apache Spark-Worker-1 Dashboard.</li>\n</ul>\n<h1 id=\"prerequisites\">Prerequisites</h1>\n<h2 id=\"docker-and-docker-compose\">Docker and Docker Compose</h2>\n<p>To keep things simple, all components will be run using <a href=\"https://www.docker.com\">Docker</a>. <strong>Docker</strong> is a container\ntechnology which allows to different components isolated into their respective environments.</p>\n<ul>\n<li>To install Docker on Windows follow the instructions <a href=\"https://docs.docker.com/docker-for-windows/\">here</a></li>\n<li>To install Docker on Mac follow the instructions <a href=\"https://docs.docker.com/docker-for-mac/\">here</a></li>\n<li>To install Docker on Linux follow the instructions <a href=\"https://docs.docker.com/install/\">here</a></li>\n</ul>\n<p><strong>Docker Compose</strong> is a tool for defining and running multi-container Docker applications. A series of\n<a href=\"https://github.com/FIWARE/tutorials.Big-Data-Spark/tree/master/docker-compose\">YAML files</a> are used to configure the\nrequired services for the application. This means all container services can be brought up in a single command. Docker\nCompose is installed by default as part of Docker for Windows and Docker for Mac, however Linux users will need to\nfollow the instructions found <a href=\"https://docs.docker.com/compose/install/\">here</a></p>\n<p>You can check your current <strong>Docker</strong> and <strong>Docker Compose</strong> versions using the following commands:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">docker-compose -v\ndocker version\n</code></pre>\n<p>Please ensure that you are using Docker version 18.03 or higher and Docker Compose 1.21 or higher and upgrade if\nnecessary.</p>\n<h2 id=\"maven\">Maven</h2>\n<p><a href=\"https://maven.apache.org/download.cgi\">Apache Maven</a> is a software project management and comprehension tool. Based on\nthe concept of a project object model (POM), Maven can manage a project's build, reporting and documentation from a\ncentral piece of information. We will use Maven to define and download our dependencies and to build and package our\ncode into a JAR file.</p>\n<h2 id=\"cygwin-for-windows\">Cygwin for Windows</h2>\n<p>We will start up our services using a simple Bash script. Windows users should download <a href=\"http://www.cygwin.com/\">cygwin</a>\nto provide a command-line functionality similar to a Linux distribution on Windows.</p>\n<h1 id=\"start-up\">Start Up</h1>\n<p>Before you start, you should ensure that you have obtained or built the necessary Docker images locally. Please clone\nthe repository and create the necessary images by running the commands shown below. Note that you might need to run some\nof the commands as a privileged user:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">git clone https://github.com/ging/fiware-cosmos-orion-spark-connector-tutorial.git\ncd fiware-cosmos-orion-spark-connector-tutorial\n./services create\n</code></pre>\n<p>This command will also import seed data from the previous tutorials and provision the dummy IoT sensors on startup.</p>\n<p>To start the system, run the following command:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">./services start\n</code></pre>\n<blockquote>\n<p>:information_source: <strong>Note:</strong> If you want to clean up and start over again you can do so with the following command:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">./services stop\n</code></pre>\n</blockquote>\n</body></html>","schema":"https://schema.getpostman.com/json/collection/v2.0.0/collection.json","toc":[{"content":"Real-time Processing and Big Data Analysis","slug":"real-time-processing-and-big-data-analysis"},{"content":"Architecture","slug":"architecture"},{"content":"Prerequisites","slug":"prerequisites"},{"content":"Start Up","slug":"start-up"}],"owner":"513743","collectionId":"c5e9ec44-1e49-4561-8b40-9c8636838f08","publishedId":"TVev6kks","public":true,"customColor":{"top-bar":"FFFFFF","right-sidebar":"303030","highlight":"233C68"},"publishDate":"2020-11-23T13:19:12.000Z"},"item":[{"name":"Receiving context data and performing operations","item":[{"name":"Orion - Subscribe to Context Changes","id":"0dc1fc51-4784-4d63-b669-92e0453d20a5","protocolProfileBehavior":{"disableBodyPruning":true},"request":{"method":"POST","header":[{"key":"Content-Type","value":"application/json"},{"key":"fiware-service","value":"openiot"},{"key":"fiware-servicepath","value":"/"}],"body":{"mode":"raw","raw":"{\n  \"description\": \"Notify Spark of all context changes\",\n  \"subject\": {\n    \"entities\": [\n      {\n        \"idPattern\": \".*\"\n      }\n    ]\n  },\n  \"notification\": {\n    \"http\": {\n      \"url\": \"http://spark-worker-1:9001\"\n    }\n  }\n}"},"url":"http://localhost:1026/v2/subscriptions/","description":"<p>Once a dynamic context system is up and running (we have deployed the <code>Logger</code> job in the Spark cluster), we need to\ninform <strong>Spark</strong> of changes in context.</p>\n<p>This is done by making a POST request to the <code>/v2/subscription</code> endpoint of the Orion Context Broker.</p>\n<ul>\n<li><p>The <code>fiware-service</code> and <code>fiware-servicepath</code> headers are used to filter the subscription to only listen to\nmeasurements from the attached IoT Sensors, since they had been provisioned using these settings</p>\n</li>\n<li><p>The notification <code>url</code> must match the one our Spark program is listening to.</p>\n</li>\n<li><p>The <code>throttling</code> value defines the rate that changes are sampled.</p>\n</li>\n</ul>\n","urlObject":{"protocol":"http","path":["v2","subscriptions",""],"host":["localhost:1026"],"query":[],"variable":[]}},"response":[],"_postman_id":"0dc1fc51-4784-4d63-b669-92e0453d20a5"},{"name":"Orion - Check Subscription is working","id":"8b8d48ed-761e-440b-9a87-1df0b87c5f6d","protocolProfileBehavior":{"disableBodyPruning":true},"request":{"method":"GET","header":[{"key":"fiware-service","value":"openiot"},{"key":"fiware-servicepath","value":"/"}],"url":"http://localhost:1026/v2/subscriptions/","description":"<p>If a subscription has been created, we can check to see if it is firing by making a GET request to the\n<code>/v2/subscriptions</code> endpoint.</p>\n<p>Within the <code>notification</code> section of the response, you can see several additional <code>attributes</code> which describe the health\nof the subscription</p>\n<p>If the criteria of the subscription have been met, <code>timesSent</code> should be greater than <code>0</code>. A zero value would indicate\nthat the <code>subject</code> of the subscription is incorrect or the subscription has created with the wrong <code>fiware-service-path</code>\nor <code>fiware-service</code> header</p>\n<p>The <code>lastNotification</code> should be a recent timestamp - if this is not the case, then the devices are not regularly\nsending data. Remember to unlock the <strong>Smart Door</strong> and switch on the <strong>Smart Lamp</strong></p>\n<p>The <code>lastSuccess</code> should match the <code>lastNotification</code> date - if this is not the case then <strong>Cosmos</strong> is not receiving\nthe subscription properly. Check that the hostname and port are correct.</p>\n<p>Finally, check that the <code>status</code> of the subscription is <code>active</code> - an expired subscription will not fire.</p>\n","urlObject":{"protocol":"http","path":["v2","subscriptions",""],"host":["localhost:1026"],"query":[],"variable":[]}},"response":[],"_postman_id":"8b8d48ed-761e-440b-9a87-1df0b87c5f6d"}],"id":"79594075-157b-47f6-83c0-e2ce0e78d0ba","description":"<p>According to the <a href=\"https://spark.apache.org/documentation.html\">Apache Spark documentation</a>, Spark Streaming is an\nextension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data\nstreams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using\ncomplex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be\npushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph\nprocessing algorithms on data streams.</p>\n<p><img src=\"https://spark.apache.org/docs/latest/img/streaming-arch.png\" alt /></p>\n<p>Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches,\nwhich are then processed by the Spark engine to generate the final stream of results in batches.</p>\n<p><img src=\"https://spark.apache.org/docs/latest/img/streaming-flow.png\" alt /></p>\n<p>This means that to create a streaming data flow we must supply the following:</p>\n<ul>\n<li>A mechanism for reading Context data as a <strong>Source Operator</strong></li>\n<li>Business logic to define the transform operations</li>\n<li>A mechanism for pushing Context data back to the context broker as a <strong>Sink Operator</strong></li>\n</ul>\n<p>The <strong>Cosmos Spark</strong> connector - <code>orion.spark.connector-1.2.1.jar</code> offers both <strong>Source</strong> and <strong>Sink</strong> operators. It\ntherefore only remains to write the necessary Scala code to connect the streaming dataflow pipeline operations together.\nThe processing code can be complied into a JAR file which can be uploaded to the spark cluster. Two examples will be\ndetailed below, all the source code for this tutorial can be found within the\n<a href=\"https://github.com/ging/fiware-cosmos-orion-spark-connector-tutorial/tree/master/cosmos-examples\">cosmos-examples</a>\ndirectory.</p>\n<p>Further Spark processing examples can be found on\n<a href=\"https://fiware-cosmos-spark-examples.readthedocs.io/\">Spark Connector Examples</a>.</p>\n<h3 id=\"compiling-a-jar-file-for-spark\">Compiling a JAR file for Spark</h3>\n<p>An existing <code>pom.xml</code> file has been created which holds the necessary prerequisites to build the examples JAR file</p>\n<p>In order to use the Orion Spark Connector we first need to manually install the connector JAR as an artifact using\nMaven:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">cd cosmos-examples\ncurl -LO https://github.com/ging/fiware-cosmos-orion-spark-connector/releases/download/FIWARE_7.9/orion.spark.connector-1.2.1.jar\nmvn install:install-file \\\n  -Dfile=./orion.spark.connector-1.2.1.jar \\\n  -DgroupId=org.fiware.cosmos \\\n  -DartifactId=orion.spark.connector \\\n  -Dversion=1.2.1 \\\n  -Dpackaging=jar\n</code></pre>\n<p>Thereafter the source code can be compiled by running the <code>mvn package</code> command within the same directory\n(<code>cosmos-examples</code>):</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">mvn package\n</code></pre>\n<p>A new JAR file called <code>cosmos-examples-1.2.1.jar</code> will be created within the <code>cosmos-examples/target</code> directory.</p>\n<h3 id=\"generating-a-stream-of-context-data\">Generating a stream of Context Data</h3>\n<p>For the purpose of this tutorial, we must be monitoring a system in which the context is periodically being updated. The\ndummy IoT Sensors can be used to do this. Open the device monitor page at <code>http://localhost:3000/device/monitor</code> and\nunlock a <strong>Smart Door</strong> and switch on a <strong>Smart Lamp</strong>. This can be done by selecting an appropriate the command from\nthe drop down list and pressing the <code>send</code> button. The stream of measurements coming from the devices can then be seen\non the same page:</p>\n<p><img src=\"https://fiware.github.io/tutorials.Big-Data-Spark/img/door-open.gif\" alt /></p>\n<h2 id=\"logger---reading-context-data-streams\">Logger - Reading Context Data Streams</h2>\n<p>The first example makes use of the <code>OrionReceiver</code> operator in order to receive notifications from the Orion Context\nBroker. Specifically, the example counts the number notifications that each type of device sends in one minute. You can\nfind the source code of the example in\n<a href=\"https://github.com/ging/fiware-cosmos-orion-spark-connector-tutorial/blob/master/cosmos-examples/src/main/scala/org/fiware/cosmos/tutorial/Logger.scala\">org/fiware/cosmos/tutorial/Logger.scala</a></p>\n<h3 id=\"logger---installing-the-jar\">Logger - Installing the JAR</h3>\n<p>Restart the containers if necessary, then access the worker container:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">docker exec -it spark-worker-1 bin/bash\n</code></pre>\n<p>And run the following command to run the generated JAR package in the Spark cluster:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">/spark/bin/spark-submit \\\n--class  org.fiware.cosmos.tutorial.Logger \\\n--master  spark://spark-master:7077 \\\n--deploy-mode client /home/cosmos-examples/target/cosmos-examples-1.2.1.jar \\\n--conf \"spark.driver.extraJavaOptions=-Dlog4jspark.root.logger=WARN,console\"\n</code></pre>\n","event":[{"listen":"prerequest","script":{"id":"0477a1da-a1f5-42f3-a91e-fc90c4ed139a","type":"text/javascript","exec":[""]}},{"listen":"test","script":{"id":"121ba677-9693-4482-89b3-2b30dc08fbf8","type":"text/javascript","exec":[""]}}],"_postman_id":"79594075-157b-47f6-83c0-e2ce0e78d0ba"},{"name":"Logger - Checking the Output","item":[],"id":"a2805aee-c239-4099-a6fb-388b86139460","description":"<p>Leave the subscription running for <strong>one minute</strong>. Then, the output on the console on which you ran the Spark job will\nbe like the following:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-text\">Sensor(Bell,3)\nSensor(Door,4)\nSensor(Lamp,7)\nSensor(Motion,6)\n</code></pre>\n<h3 id=\"logger---analyzing-the-code\">Logger - Analyzing the Code</h3>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-scala\">package org.fiware.cosmos.tutorial\n\nimport org.apache.spark.SparkConf\nimport org.apache.spark.streaming.{Seconds, StreamingContext}\nimport org.fiware.cosmos.orion.spark.connector.OrionReceiver\n\nobject Logger{\n\n  def main(args: Array[String]): Unit = {\n\n    val conf = new SparkConf().setAppName(\"Logger\")\n    val ssc = new StreamingContext(conf, Seconds(60))\n    // Create Orion Receiver. Receive notifications on port 9001\n    val eventStream = ssc.receiverStream(new OrionReceiver(9001))\n\n    // Process event stream\n    val processedDataStream= eventStream\n      .flatMap(event =&gt; event.entities)\n      .map(ent =&gt; {\n        new Sensor(ent.`type`)\n      })\n      .countByValue()\n      .window(Seconds(60))\n\n    processedDataStream.print()\n\n    ssc.start()\n    ssc.awaitTermination()\n  }\n  case class Sensor(device: String)\n}\n</code></pre>\n<p>The first lines of the program are aimed at importing the necessary dependencies, including the connector. The next step\nis to create an instance of the <code>OrionReceiver</code> using the class provided by the connector and to add it to the\nenvironment provided by Spark.</p>\n<p>The <code>OrionReceiver</code> constructor accepts a port number (<code>9001</code>) as a parameter. This port is used to listen to the\nsubscription notifications coming from Orion and converted to a <code>DataStream</code> of <code>NgsiEvent</code> objects. The definition of\nthese objects can be found within the\n<a href=\"https://github.com/ging/fiware-cosmos-orion-spark-connector/blob/master/README.md#orionreceiver\">Orion-Spark Connector documentation</a>.</p>\n<p>The stream processing consists of five separate steps. The first step (<code>flatMap()</code>) is performed in order to put\ntogether the entity objects of all the NGSI Events received in a period of time. Thereafter the code iterates over them\n(with the <code>map()</code> operation) and extracts the desired attributes. In this case, we are interested in the sensor <code>type</code>\n(<code>Door</code>, <code>Motion</code>, <code>Bell</code> or <code>Lamp</code>).</p>\n<p>Within each iteration, we create a custom object with the property we need: the sensor <code>type</code>. For this purpose, we can\ndefine a case class as shown:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-scala\">case class Sensor(device: String)\n</code></pre>\n<p>Thereafter can count the created objects by the type of device (<code>countByValue()</code>) and perform operations such as\n<code>window()</code> on them.</p>\n<p>After the processing, the results are output to the console:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-scala\">processedDataStream.print()\n</code></pre>\n","_postman_id":"a2805aee-c239-4099-a6fb-388b86139460"},{"name":"Receiving context data, performing operations and persisting context data","item":[{"name":"Orion - Subscribe to Context Changes","id":"84d6c8a8-eefc-4c10-9f69-ca78a824b540","protocolProfileBehavior":{"disableBodyPruning":true},"request":{"method":"POST","header":[{"key":"Content-Type","value":"application/json"},{"key":"fiware-service","value":"openiot"},{"key":"fiware-servicepath","value":"/"}],"body":{"mode":"raw","raw":"{\n  \"description\": \"Notify Spark of all motion sensor\",\n  \"subject\": {\n    \"entities\": [\n      {\n        \"idPattern\": \"Motion.*\"\n      }\n    ]\n  },\n  \"notification\": {\n    \"http\": {\n      \"url\": \"http://spark-worker-1:9001\"\n    }\n  }\n}"},"url":"http://localhost:1026/v2/subscriptions/","description":"<p>If the previous example has not been run, a new subscription will need to be set up. A narrower subscription can be set\nup to only trigger a notification when a motion sensor detects movement.</p>\n<blockquote>\n<p><strong>Note:</strong> If the previous subscription already exists, this step creating a second narrower Motion-only subscription\nis unnecessary. There is a filter within the business logic of the scala task itself.</p>\n</blockquote>\n","urlObject":{"protocol":"http","path":["v2","subscriptions",""],"host":["localhost:1026"],"query":[],"variable":[]}},"response":[],"_postman_id":"84d6c8a8-eefc-4c10-9f69-ca78a824b540"},{"name":"Orion - Check Subscription is working","id":"3a1094c5-debd-4dff-a962-8df426744d1a","protocolProfileBehavior":{"disableBodyPruning":true},"request":{"method":"GET","header":[{"key":"fiware-service","value":"openiot"},{"key":"fiware-servicepath","value":"/"}],"url":"http://localhost:1026/v2/subscriptions/","description":"<p>If a subscription has been created, you can check to see if it is firing by making a GET \nrequest to the <code>/v2/subscriptions</code> endpoint.</p>\n<p>Within the <code>notification</code> section of the response, you can see several additional <code>attributes</code> which describe the health of the subscription</p>\n<p>If the criteria of the subscription have been met, <code>timesSent</code> should be greater than <code>0</code>.\nA zero value would indicate that the <code>subject</code> of the subscription is incorrect or the subscription \nhas created with the wrong <code>fiware-service-path</code> or <code>fiware-service</code> header</p>\n<p>The <code>lastNotification</code> should be a recent timestamp - if this is not the case, then the devices\nare not regularly sending data. Remember to unlock the <strong>Smart Door</strong> and switch on the <strong>Smart Lamp</strong></p>\n<p>The <code>lastSuccess</code> should match the <code>lastNotification</code> date - if this is not the case \nthen <strong>Draco</strong> is not receiving the subscription properly. Check that the host name\nand port are correct. </p>\n<p>Finally, check that the <code>status</code> of the subscription is <code>active</code> - an expired subscription\nwill not fire.</p>\n","urlObject":{"protocol":"http","path":["v2","subscriptions",""],"host":["localhost:1026"],"query":[],"variable":[]}},"response":[],"_postman_id":"3a1094c5-debd-4dff-a962-8df426744d1a"},{"name":"Orion - Delete Subscription","id":"548a773a-df1a-404e-b9cc-86748d24b3f5","protocolProfileBehavior":{"disableBodyPruning":true},"request":{"method":"DELETE","header":[{"key":"fiware-service","value":"openiot"},{"key":"fiware-servicepath","value":"/"}],"body":{"mode":"raw","raw":""},"url":"http://localhost:1026/v2/subscriptions/5e134a0c924f6d7d27b63844","description":"<p>If a subscription has been created, you can check to see if it is firing by making a GET \nrequest to the <code>/v2/subscriptions</code> endpoint.</p>\n<p>Within the <code>notification</code> section of the response, you can see several additional <code>attributes</code> which describe the health of the subscription</p>\n<p>If the criteria of the subscription have been met, <code>timesSent</code> should be greater than <code>0</code>.\nA zero value would indicate that the <code>subject</code> of the subscription is incorrect or the subscription \nhas created with the wrong <code>fiware-service-path</code> or <code>fiware-service</code> header</p>\n<p>The <code>lastNotification</code> should be a recent timestamp - if this is not the case, then the devices\nare not regularly sending data. Remember to unlock the <strong>Smart Door</strong> and switch on the <strong>Smart Lamp</strong></p>\n<p>The <code>lastSuccess</code> should match the <code>lastNotification</code> date - if this is not the case \nthen <strong>Draco</strong> is not receiving the subscription properly. Check that the host name\nand port are correct. </p>\n<p>Finally, check that the <code>status</code> of the subscription is <code>active</code> - an expired subscription\nwill not fire.</p>\n","urlObject":{"protocol":"http","path":["v2","subscriptions","5e134a0c924f6d7d27b63844"],"host":["localhost:1026"],"query":[],"variable":[]}},"response":[],"_postman_id":"548a773a-df1a-404e-b9cc-86748d24b3f5"}],"id":"930520c3-236e-4d6c-b3d4-bd9c72e547cc","description":"<p>The second example switches on a lamp when its motion sensor detects movement.</p>\n<p>The dataflow stream uses the <code>OrionReceiver</code> operator in order to receive notifications and filters the input to only\nrespond to motion senseors and then uses the <code>OrionSink</code> to push processed context back to the Context Broker. You can\nfind the source code of the example in\n<a href=\"https://github.com/ging/fiware-cosmos-orion-spark-connector-tutorial/blob/master/cosmos-examples/src/main/scala/org/fiware/cosmos/tutorial/Feedback.scala\">org/fiware/cosmos/tutorial/Feedback.scala</a></p>\n<h3 id=\"feedback-loop---installing-the-jar\">Feedback Loop - Installing the JAR</h3>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">/spark/bin/spark-submit  \\\n--class  org.fiware.cosmos.tutorial.Feedback \\\n--master  spark://spark-master:7077 \\\n--deploy-mode client /home/cosmos-examples/target/cosmos-examples-1.2.1.jar \\\n--conf \"spark.driver.extraJavaOptions=-Dlog4jspark.root.logger=WARN,console\"\n</code></pre>\n","event":[{"listen":"prerequest","script":{"id":"304ca99a-e4d0-42b1-bb12-c622a653d9af","type":"text/javascript","exec":[""]}},{"listen":"test","script":{"id":"cb7aca3f-0b0a-4afa-850b-af050c4240f7","type":"text/javascript","exec":[""]}}],"_postman_id":"930520c3-236e-4d6c-b3d4-bd9c72e547cc"},{"name":"Feedback Loop - Checking the Output","item":[],"id":"1f21c8c7-351b-4803-a2d6-f18542618756","description":"<p>Go to <code>http://localhost:3000/device/monitor</code></p>\n<p>Within any Store, unlock the door and wait. Once the door opens and the Motion sensor is triggered, the lamp will switch\non directly</p>\n<h3 id=\"feedback-loop---analyzing-the-code\">Feedback Loop - Analyzing the Code</h3>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-scala\">package org.fiware.cosmos.tutorial\n\nimport org.apache.spark.SparkConf\nimport org.apache.spark.streaming.{Seconds, StreamingContext}\nimport org.fiware.cosmos.orion.spark.connector._\n\nobject Feedback {\n  final val CONTENT_TYPE = ContentType.JSON\n  final val METHOD = HTTPMethod.PATCH\n  final val CONTENT = \"{\\n  \\\"on\\\": {\\n      \\\"type\\\" : \\\"command\\\",\\n      \\\"value\\\" : \\\"\\\"\\n  }\\n}\"\n  final val HEADERS = Map(\"fiware-service\" -&gt; \"openiot\",\"fiware-servicepath\" -&gt; \"/\",\"Accept\" -&gt; \"*/*\")\n\n  def main(args: Array[String]): Unit = {\n\n    val conf = new SparkConf().setAppName(\"Feedback\")\n    val ssc = new StreamingContext(conf, Seconds(10))\n    // Create Orion Receiver. Receive notifications on port 9001\n    val eventStream = ssc.receiverStream(new OrionReceiver(9001))\n\n    // Process event stream\n    val processedDataStream = eventStream\n      .flatMap(event =&gt; event.entities)\n      .filter(entity=&gt;(entity.attrs(\"count\").value == \"1\"))\n      .map(entity=&gt; new Sensor(entity.id))\n      .window(Seconds(10))\n\n    val sinkStream= processedDataStream.map(sensor =&gt; {\n      val url=\"http://localhost:1026/v2/entities/Lamp:\"+sensor.id.takeRight(3)+\"/attrs\"\n      OrionSinkObject(CONTENT,url,CONTENT_TYPE,METHOD,HEADERS)\n    })\n    // Add Orion Sink\n    OrionSink.addSink( sinkStream )\n\n    // print the results with a single thread, rather than in parallel\n    processedDataStream.print()\n    ssc.start()\n\n    ssc.awaitTermination()\n  }\n\n  case class Sensor(id: String)\n}\n</code></pre>\n<p>As you can see, it is similar to the previous example. The main difference is that it writes the processed data back in\nthe Context Broker through the <strong><code>OrionSink</code></strong>.</p>\n<p>The arguments of the <strong><code>OrionSinkObject</code></strong> are:</p>\n<ul>\n<li><strong>Message</strong>: <code>\"{\\n \\\"on\\\": {\\n \\\"type\\\" : \\\"command\\\",\\n \\\"value\\\" : \\\"\\\"\\n }\\n}\"</code>. We send 'on' command</li>\n<li><strong>URL</strong>: <code>\"http://localhost:1026/v2/entities/Lamp:\"+node.id.takeRight(3)+\"/attrs\"</code>. TakeRight(3) gets the number of\nthe room, for example '001')</li>\n<li><strong>Content Type</strong>: <code>ContentType.Plain</code>.</li>\n<li><strong>HTTP Method</strong>: <code>HTTPMethod.POST</code>.</li>\n<li><strong>Headers</strong>: <code>Map(\"fiware-service\" -&gt; \"openiot\",\"fiware-servicepath\" -&gt; \"/\",\"Accept\" -&gt; \"*/*\")</code>. Optional parameter.\nWe add the headers we need in the HTTP Request.</li>\n</ul>\n","event":[{"listen":"prerequest","script":{"id":"c38a5871-639b-4f83-8326-2d0c6e247a87","type":"text/javascript","exec":[""]}},{"listen":"test","script":{"id":"7c1b3690-4a40-419d-9e4f-dd8f3acde622","type":"text/javascript","exec":[""]}}],"_postman_id":"1f21c8c7-351b-4803-a2d6-f18542618756"}],"event":[{"listen":"prerequest","script":{"id":"e3ee3d45-9b02-4548-b269-b1d13f810c9f","type":"text/javascript","exec":[""]}},{"listen":"test","script":{"id":"b18ca78d-898a-4efc-97cd-95e8072bee01","type":"text/javascript","exec":[""]}}],"variable":[{"key":"orion","value":"localhost:1026"},{"key":"subscriptionId","value":"5e134a0c924f6d7d27b63844"}]}