{"info":{"_postman_id":"6260b900-e6c8-436d-8759-2cbfd977b231","name":"NGSI-LD Real-time Processing (Flink)","description":"<html><head></head><body><p>This tutorial is an introduction to the <a href=\"http://fiware-cosmos-flink.rtfd.io\">FIWARE Cosmos Orion Flink Connector</a>, which\nfacilitates Big Data analysis of context data, through an integration with <a href=\"https://flink.apache.org/\">Apache Flink</a>,\none of the most popular Big Data platforms. Apache Flink is a framework and distributed processing engine for stateful\ncomputations both over unbounded and bounded data streams. Flink has been designed to run in all common cluster\nenvironments, perform computations at in-memory speed and at any scale.</p>\n<p>The <code>docker-compose</code> file for this tutorial can be found on GitHub: </p>\n<p><img src=\"https://fiware.github.io/tutorials.Big-Data-Flink/icon/GitHub-Mark-32px.png\" alt=\"GitHub\"> <a href=\"https://github.com/FIWARE/tutorials.Big-Data-Flink\">FIWARE-LD 305: Big Data Analysis (Flink) </a></p>\n<h1 id=\"real-time-processing-and-big-data-analysis\">Real-time Processing and Big Data Analysis</h1>\n<blockquote>\n<p>\"Who controls the past controls the future: who controls the present controls the past.\"</p>\n<p>— George Orwell. \"1984\"</p>\n</blockquote>\n<p>Smart solutions based on FIWARE are architecturally designed around microservices. They are therefore are designed to\nscale-up from simple applications (such as the Supermarket tutorial) through to city-wide installations base on a large\narray of IoT sensors and other context data providers.</p>\n<p>The massive amount of data involved enventually becomes too much for a single machine to analyse, process and store, and\ntherefore the work must be delegated to additional distributed services. These distributed systems form the basis of\nso-called <strong>Big Data Analysis</strong>. The distribution of tasks allows developers to be able to extract insights from huge\ndata sets which would be too complex to be dealt with using traditional methods. and uncover hidden patterns and\ncorrelations.</p>\n<p>As we have seen, context data is core to any Smart Solution, and the Context Broker is able to monitor changes of state\nand raise <a href=\"https://github.com/Fiware/tutorials.Subscriptions\">subscription events</a> as the context changes. For smaller\ninstallations, each subscription event can be processed one-by-one by a single receiving endpoint, however as the system\ngrows, another technique will be required to avoid overwhelming the listener, potentially blocking resources and missing\nupdates.</p>\n<p><strong>Apache Flink</strong> is a Java/Scala based stream-processing framework which enables the delegation of data-flow processes.\nTherefore additional computational resources can be called upon to deal with data as events arrive. The <strong>Cosmos Flink</strong>\nconnector allows developers write custom business logic to listen for context data subscription events and then process\nthe flow of the context data. Flink is able to delegate these actions to other workers where they will be acted upon\neither in sequentiallly or in parallel as required. The data flow processing itself can be arbitrarily complex.</p>\n<p>Obviously, in reality, our existing Supermarket scenario is far too small to require the use of a Big Data solution, but\nwill serve as a basis for demonstrating the type of real-time processing which may be required in a larger solution\nwhich is processing a continuous stream of context-data events.</p>\n<h1 id=\"architecture\">Architecture</h1>\n<p>This application builds on the components and dummy IoT devices created in\n<a href=\"https://github.com/FIWARE/tutorials.IoT-Agent/\">previous tutorials</a>. It will make use of three FIWARE components - the\n<a href=\"https://fiware-orion.readthedocs.io/en/latest/\">Orion Context Broker</a>, the\n<a href=\"https://fiware-iotagent-ul.readthedocs.io/en/latest/\">IoT Agent for Ultralight 2.0</a>, and the\n<a href=\"https://fiware-cosmos-flink.readthedocs.io/en/latest/\">Cosmos Orion Flink Connector</a> for connecting Orion to an\n<a href=\"https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html\">Apache Flink cluster</a>. The Flink cluster\nitself will consist of a single <strong>JobManager</strong> <em>master</em> to coordinate execution and a single <strong>TaskManager</strong> <em>worker</em> to\nexecute the tasks.</p>\n<p>Both the Orion Context Broker and the IoT Agent rely on open source <a href=\"https://www.mongodb.com/\">MongoDB</a> technology to\nkeep persistence of the information they hold. We will also be using the dummy IoT devices created in the\n<a href=\"https://github.com/FIWARE/tutorials.IoT-Agent/\">previous tutorial</a>.</p>\n<p>Therefore the overall architecture will consist of the following elements:</p>\n<ul>\n<li>Two <strong>FIWARE Generic Enablers</strong> as independent microservices:<ul>\n<li>The <a href=\"https://fiware-orion.readthedocs.io/en/latest/\">Orion Context Broker</a> which will receive requests using\n<a href=\"https://forge.etsi.org/swagger/ui/?url=https://forge.etsi.org/gitlab/NGSI-LD/NGSI-LD/raw/master/spec/updated/full_api.json\">NGSI-LD</a></li>\n<li>The FIWARE <a href=\"https://fiware-iotagent-ul.readthedocs.io/en/latest/\">IoT Agent for UltraLight 2.0</a> which will\nreceive southbound requests using\n<a href=\"https://forge.etsi.org/swagger/ui/?url=https://forge.etsi.org/gitlab/NGSI-LD/NGSI-LD/raw/master/spec/updated/full_api.json\">NGSI-LD</a>\nand convert them to\n<a href=\"https://fiware-iotagent-ul.readthedocs.io/en/latest/usermanual/index.html#user-programmers-manual\">UltraLight 2.0</a>\ncommands for the devices</li>\n</ul>\n</li>\n<li>An <a href=\"https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html\">Apache Flink cluster</a> consisting\nof a single <strong>JobManager</strong> and a single <strong>TaskManager</strong><ul>\n<li>The FIWARE <a href=\"https://fiware-cosmos-flink.readthedocs.io/en/latest/\">Cosmos Orion Flink Connector</a> will be\ndeployed as part of the dataflow which will subscribe to context changes and make operations on them in\nreal-time</li>\n</ul>\n</li>\n<li>One <a href=\"https://www.mongodb.com/\">MongoDB</a> <strong>database</strong> :<ul>\n<li>Used by the <strong>Orion Context Broker</strong> to hold context data information such as data entities, subscriptions and\nregistrations</li>\n<li>Used by the <strong>IoT Agent</strong> to hold device information such as device URLs and Keys</li>\n</ul>\n</li>\n<li>The <strong>Tutorial Application</strong> does the following:<ul>\n<li>Offers static <code>@context</code> files defining the context entities within the system.</li>\n<li>Acts as set of dummy <a href=\"https://github.com/FIWARE/tutorials.IoT-Sensors/tree/NGSI-LD\">agricultural IoT devices</a>\nusing the\n<a href=\"https://fiware-iotagent-ul.readthedocs.io/en/latest/usermanual/index.html#user-programmers-manual\">UltraLight 2.0</a>\nprotocol running over HTTP.</li>\n</ul>\n</li>\n</ul>\n<p>The overall architecture can be seen below:</p>\n<p><img src=\"https://fiware.github.io/tutorials.Big-Data-Flink/img/architecture.png\" alt=\"\"></p>\n<p>Since all interactions between the elements are initiated by HTTP requests, the entities can be containerized and run\nfrom exposed ports.</p>\n<p>The configuration information of the Apache Flink cluster can be seen in the <code>jobmanager</code> and <code>taskmanager</code> sections of\nthe associated <code>docker-compose.yml</code> file:</p>\n<h2 id=\"flink-cluster-configuration\">Flink Cluster Configuration</h2>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-yaml\">jobmanager:\n    image: flink:1.9.0-scala_2.11\n    hostname: jobmanager\n    container_name: flink-jobmanager\n    expose:\n        - \"8081\"\n        - \"9001\"\n    ports:\n        - \"6123:6123\"\n        - \"8081:8081\"\n    command: jobmanager\n    environment:\n        - JOB_MANAGER_RPC_ADDRESS=jobmanager\n</code></pre>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-yaml\">taskmanager:\n    image: flink:1.9.0-scala_2.11\n    hostname: taskmanager\n    container_name: flink-taskmanager\n    ports:\n        - \"6121:6121\"\n        - \"6122:6122\"\n        - \"9001:9001\"\n    depends_on:\n        - jobmanager\n    command: taskmanager\n    links:\n        - \"jobmanager:jobmanager\"\n    environment:\n        - JOB_MANAGER_RPC_ADDRESS=jobmanager\n</code></pre>\n<p>The <code>jobmanager</code> container is listening on three ports:</p>\n<ul>\n<li>Port <code>8081</code> is exposed so we can see the web frontend of the Apache Flink Dashboard</li>\n<li>Port <code>6123</code> is the standard <strong>JobManager</strong> RPC port, used for internal communications</li>\n</ul>\n<p>The <code>taskmanager</code> container is listening on two ports:</p>\n<ul>\n<li>Ports <code>6121</code> and <code>6122</code> are used and RPC ports by the <strong>TaskManager</strong>, used for internal communications</li>\n<li>Port <code>9001</code> is exposed so that the installation can receive context data subscriptions</li>\n</ul>\n<p>The containers within the flink cluster are driven by a single environment variable as shown:</p>\n<div class=\"click-to-expand-wrapper is-table-wrapper\"><table>\n<thead>\n<tr>\n<th>Key</th>\n<th>Value</th>\n<th>Description</th>\n</tr>\n</thead>\n<tbody>\n<tr>\n<td>JOB_MANAGER_RPC_ADDRESS</td>\n<td><code>jobmanager</code></td>\n<td>URL of the <em>master</em> Job Manager which coordinates the task processing</td>\n</tr>\n</tbody>\n</table>\n</div><h1 id=\"start-up\">Start Up</h1>\n<p>Before you start, you should ensure that you have obtained or built the necessary Docker images locally. Please clone\nthe repository and create the necessary images by running the commands shown below. Note that you might need to run some\nof the commands as a privileged user:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">git clone https://github.com/FIWARE/tutorials.Big-Data-Flink.git\ncd tutorials.Big-Data-Flink\ngit checkout NGSI-LD\n./services create\n</code></pre>\n<p>This command will also import seed data from the previous tutorials and provision the dummy IoT sensors on startup.</p>\n<p>To start the system, run the following command:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">./services start\n</code></pre>\n<blockquote>\n<p>:information_source: <strong>Note:</strong> If you want to clean up and start over again you can do so with the following command:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code>./services stop\n</code></pre></blockquote>\n<h1 id=\"real-time-processing-operations\">Real-time Processing Operations</h1>\n<p>Dataflow within <strong>Apache Flink</strong> is defined within the\n<a href=\"https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/programming-model.html\">Flink documentation</a> as\nfollows:</p>\n<blockquote>\n<p>\"The basic building blocks of Flink programs are streams and transformations. Conceptually a stream is a (potentially\nnever-ending) flow of data records, and a transformation is an operation that takes one or more streams as input, and\nproduces one or more output streams as a result.</p>\n<p>When executed, Flink programs are mapped to streaming dataflows, consisting of streams and transformation operators.\nEach dataflow starts with one or more sources and ends in one or more sinks. The dataflows resemble arbitrary directed\nacyclic graphs (DAGs). Although special forms of cycles are permitted via iteration constructs, for the most part this\ncan be glossed over this for simplicity.\"</p>\n</blockquote>\n<p><img src=\"https://fiware.github.io/tutorials.Big-Data-Flink/img/streaming-dataflow.png\" alt=\"\"></p>\n<p>This means that to create a streaming data flow we must supply the following:</p>\n<ul>\n<li>A mechanism for reading Context data as a <strong>Source Operator</strong></li>\n<li>Business logic to define the transform operations</li>\n<li>A mechanism for pushing Context data back to the context broker as a <strong>Sink Operator</strong></li>\n</ul>\n<p>The <code>orion.flink.connector-1.2.4.jar</code> offers both <strong>Source</strong> and <strong>Sink</strong> operations. It therefore only remains to write\nthe necessary Scala code to connect the streaming dataflow pipeline operations together. The processing code can be\ncomplied into a JAR file which can be uploaded to the flink cluster. Two examples will be detailed below, all the source\ncode for this tutorial can be found within the\n<a href=\"https://github.com/FIWARE/tutorials.Big-Data-Flink/tree/master/cosmos-examples\">cosmos-examples</a> directory.</p>\n<p>Further Flink processing examples can be found on the\n<a href=\"https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started\">Apache Flink site</a> and\n<a href=\"https://fiware-cosmos-flink-examples.readthedocs.io/\">Flink Connector Examples</a>.</p>\n<h3 id=\"compiling-a-jar-file-for-flink\">Compiling a JAR file for Flink</h3>\n<p>An existing <code>pom.xml</code> file has been created which holds the necessary prerequisites to build the examples JAR file</p>\n<p>In order to use the Orion Flink Connector we first need to manually install the connector JAR as an artifact using\nMaven:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">cd cosmos-examples\ncurl -LO https://github.com/ging/fiware-cosmos-orion-flink-connector/releases/download/1.2.4/orion.flink.connector-1.2.4.jar\nmvn install:install-file \\\n  -Dfile=./orion.flink.connector-1.2.4.jar \\\n  -DgroupId=org.fiware.cosmos \\\n  -DartifactId=orion.flink.connector \\\n  -Dversion=1.2.4 \\\n  -Dpackaging=jar\n</code></pre>\n<p>Thereafter the source code can be compiled by running the <code>mvn package</code> command within the same directory\n(<code>cosmos-examples</code>):</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">mvn package\n</code></pre>\n<p>A new JAR file called <code>cosmos-examples-1.2.jar</code> will be created within the <code>cosmos-examples/target</code> directory.</p>\n<h3 id=\"generating-a-stream-of-context-data\">Generating a stream of Context Data</h3>\n<p>For the purpose of this tutorial, we must be monitoring a system in which the context is periodically being updated. The\ndummy IoT Sensors can be used to do this. Open the device monitor page at <code>http://localhost:3000/device/monitor</code> and\nstart a tractor moving. This can be done by selecting an appropriate the command from\nthe drop down list and pressing the <code>send</code> button. The stream of measurements coming from the devices can then be seen\non the same page:</p>\n<p><img src=\"https://fiware.github.io/tutorials.Big-Data-Flink/img/farm-devices.png\" alt=\"\"></p>\n<h2 id=\"logger---reading-context-data-streams\">Logger - Reading Context Data Streams</h2>\n<p>The first example makes use of the <code>NGSILDSource</code> operator in order to receive notifications from the Orion-LD Context\nBroker. Specifically, the example counts the number notifications that each type of device sends in one minute. You can\nfind the source code of the example in\n<a href=\"https://github.com/FIWARE/tutorials.Big-Data-Flink/blob/NGSI-LD/cosmos-examples/src/main/scala/org/fiware/cosmos/tutorial/LoggerLD.scala\">org/fiware/cosmos/tutorial/LoggerLD.scala</a></p>\n<h3 id=\"logger---installing-the-jar\">Logger - Installing the JAR</h3>\n<p>Open the browser and access <code>http://localhost:8081/#/submit</code></p>\n<p><img src=\"https://fiware.github.io/tutorials.Big-Data-Flink/img/submit-logger.png\" alt=\"\"></p>\n<p>Submit new job</p>\n<ul>\n<li><strong>Filename:</strong> <code>cosmos-examples-1.2.jar</code></li>\n<li><strong>Entry Class:</strong> <code>org.fiware.cosmos.tutorial.LoggerLD</code></li>\n</ul>\n<p>An alternative would be to use curl on the command-line as shown:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">curl -X POST -H \"Expect:\" -F \"jarfile=@/cosmos-examples-1.2.jar\" http://localhost:8081/jars/upload\n</code></pre>\n</body></html>","schema":"https://schema.getpostman.com/json/collection/v2.0.0/collection.json","toc":[{"content":"Real-time Processing and Big Data Analysis","slug":"real-time-processing-and-big-data-analysis"},{"content":"Architecture","slug":"architecture"},{"content":"Start Up","slug":"start-up"},{"content":"Real-time Processing Operations","slug":"real-time-processing-operations"}],"owner":"513743","collectionId":"6260b900-e6c8-436d-8759-2cbfd977b231","publishedId":"TWDUpxjm","public":true,"customColor":{"top-bar":"FFFFFF","right-sidebar":"303030","highlight":"FF6C37"},"publishDate":"2024-05-29T15:39:52.000Z"},"item":[{"name":"Receiving context data and performing operations","item":[{"name":"Orion - Subscribe to Context Changes","id":"d910f76a-f518-444b-8818-3e4404a0435e","protocolProfileBehavior":{"disableBodyPruning":true},"request":{"method":"POST","header":[{"key":"Content-Type","value":"application/ld+json"},{"key":"NGSILD-Tenant","value":"openiot"}],"body":{"mode":"raw","raw":"{\n  \"description\": \"Notify Flink of all animal and farm vehicle movements\",\n  \"type\": \"Subscription\",\n  \"entities\": [{\"type\": \"Tractor\"}, {\"type\": \"Device\"}],\n  \"watchedAttributes\": [\"location\"],\n  \"notification\": {\n    \"attributes\": [\"location\"],\n    \"format\": \"normalized\",\n    \"endpoint\": {\n      \"uri\": \"http://taskmanager:9001\",\n      \"accept\": \"application/json\"\n    }\n  },\n   \"@context\": \"http://context-provider:3000/data-models/ngsi-context.jsonld\"\n}"},"url":"http://localhost:1026/ngsi-ld/v1/subscriptions/","description":"<p>Once a dynamic context system is up and running (we have deployed the <code>Logger</code> job in the Flink cluster), we need to\ninform <strong>Flink</strong> of changes in context.</p>\n<p>This is done by making a POST request to the <code>/ngsi-ld/v1/subscriptions</code> endpoint of the Orion Context Broker.</p>\n<ul>\n<li><p>The <code>NGSILD-Tenant</code> header is used to filter the subscription to only listen to measurements from the attached IoT Sensors, since they had been provisioned using these settings</p>\n</li>\n<li><p>The notification <code>uri</code> must match the one our Flink program is listening to.</p>\n</li>\n<li><p>The <code>throttling</code> value defines the rate that changes are sampled.</p>\n</li>\n</ul>\n<p>The response will be <code>**201 - Created**</code></p>\n","urlObject":{"protocol":"http","path":["ngsi-ld","v1","subscriptions",""],"host":["localhost:1026"],"query":[],"variable":[]}},"response":[],"_postman_id":"d910f76a-f518-444b-8818-3e4404a0435e"},{"name":"Orion - Check Subscription is working","id":"00d68da3-f9dc-4e35-b940-2f492d7dfdd7","protocolProfileBehavior":{"disableBodyPruning":true},"request":{"method":"GET","header":[{"key":"NGSILD-Tenant","value":"openiot"}],"url":"http://localhost:1026/ngsi-ld/v1/subscriptions/","description":"<p>If a subscription has been created, we can check to see if it is firing by making a GET request to the\n<code>/ngsi-ld/v1/subscriptions/</code> endpoint.</p>\n<p>Within the <code>notification</code> section of the response, you can see several additional <code>attributes</code> which describe the health\nof the subscription</p>\n<p>If the criteria of the subscription have been met, <code>timesSent</code> should be greater than <code>0</code>. A zero value would indicate\nthat the <code>subject</code> of the subscription is incorrect or the subscription has created with the wrong <code>fiware-service-path</code>\nor <code>fiware-service</code> header</p>\n<p>The <code>lastNotification</code> should be a recent timestamp - if this is not the case, then the devices are not regularly\nsending data. Remember to activate the smart farm by moving a <strong>Tractor</strong></p>\n<p>The <code>lastSuccess</code> should match the <code>lastNotification</code> date - if this is not the case then <strong>Cosmos</strong> is not receiving\nthe subscription properly. Check that the hostname and port are correct.</p>\n<p>Finally, check that the <code>status</code> of the subscription is <code>active</code> - an expired subscription will not fire.</p>\n","urlObject":{"protocol":"http","path":["ngsi-ld","v1","subscriptions",""],"host":["localhost:1026"],"query":[],"variable":[]}},"response":[],"_postman_id":"00d68da3-f9dc-4e35-b940-2f492d7dfdd7"}],"id":"28d282bc-5cf6-47b1-bcbe-5d7f058f19ed","description":"<p>The first example makes use of the <code>NGSILDSource</code> operator in order to receive notifications from the Orion-LD Context\nBroker. Specifically, the example counts the number notifications that each type of device sends in one minute. You can\nfind the source code of the example in\n<a href=\"https://github.com/FIWARE/tutorials.Big-Data-Flink/blob/NGSI-LD/cosmos-examples/src/main/scala/org/fiware/cosmos/tutorial/LoggerLD.scala\">org/fiware/cosmos/tutorial/LoggerLD.scala</a></p>\n<h3 id=\"logger---installing-the-jar\">Logger - Installing the JAR</h3>\n<p>Open the browser and access <code>http://localhost:8081/#/submit</code></p>\n<p><img src=\"https://fiware.github.io/tutorials.Big-Data-Flink/img/submit-logger.png\" alt /></p>\n<p>Submit new job</p>\n<ul>\n<li><strong>Filename:</strong> <code>cosmos-examples-1.2.jar</code></li>\n<li><strong>Entry Class:</strong> <code>org.fiware.cosmos.tutorial.LoggerLD</code></li>\n</ul>\n<p>An alternative would be to use curl on the command-line as shown:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">curl -X POST -H \"Expect:\" -F \"jarfile=@/cosmos-examples-1.2.jar\" http://localhost:8081/jars/upload\n</code></pre>\n<h3 id=\"logger---checking-the-output\">Logger - Checking the Output</h3>\n<p>Leave the subscription running for <strong>one minute</strong>, then run the following:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-console\">docker logs flink-taskmanager -f --until=60s &gt; stdout.log 2&gt;stderr.log\ncat stderr.log\n</code></pre>\n<p>After creating the subscription, the output on the console will be like the following:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-text\">Sensor(Tractor,19)\nSensor(Device,49)\n</code></pre>\n<h3 id=\"logger---analyzing-the-code\">Logger - Analyzing the Code</h3>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-scala\">package org.fiware.cosmos.tutorial\n\n\nimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}\nimport org.apache.flink.streaming.api.windowing.time.Time\nimport org.fiware.cosmos.orion.flink.connector.{NGSILDSource}\n\nobject LoggerLD {\n\n  def main(args: Array[String]): Unit = {\n    val env = StreamExecutionEnvironment.getExecutionEnvironment\n    // Create Orion Source. Receive notifications on port 9001\n    val eventStream = env.addSource(new NGSILDSource(9001))\n\n    // Process event stream\n    val processedDataStream = eventStream\n      .flatMap(event =&gt; event.entities)\n      .map(entity =&gt; new Sensor(entity.`type`, 1))\n      .keyBy(\"device\")\n      .timeWindow(Time.seconds(60))\n      .sum(1)\n\n    // print the results with a single thread, rather than in parallel\n    processedDataStream.printToErr().setParallelism(1)\n    env.execute(\"Socket Window NgsiLDEvent\")\n  }\n}\ncase class Sensor(device: String, sum: Int)\n</code></pre>\n<p>The first lines of the program are aimed at importing the necessary dependencies, including the connector. The next step\nis to create an instance of the <code>NGSILDSource</code> using the class provided by the connector and to add it to the environment\nprovided by Flink.</p>\n<p>The <code>NGSILDSource</code> constructor accepts a port number (<code>9001</code>) as a parameter. This port is used to listen to the\nsubscription notifications coming from the context broker and converted to a <code>DataStream</code> of <code>NgsiEventLD</code> objects. The definition of\nthese objects can be found within the\n<a href=\"https://github.com/ging/fiware-cosmos-orion-flink-connector/blob/master/README.md#NGSILDSource\">Orion-Flink Connector documentation</a>.</p>\n<p>The stream processing consists of five separate steps. The first step (<code>flatMap()</code>) is performed in order to put\ntogether the entity objects of all the NGSI-LD Events received in a period of time. Thereafter the code iterates over them\n(with the <code>map()</code> operation) and extracts the desired attributes. In this case, we are interested in the entity <code>type</code>\n(<code>Device</code>  or <code>Tractor</code>).</p>\n<p>Within each iteration, we create a custom object with the properties we need: the sensor <code>type</code> and the increment of\neach notification. For this purpose, we can define a case class as shown:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-scala\">case class Sensor(device: String, sum: Int)\n</code></pre>\n<p>Therefter can group the created objects by the type of device (<code>keyBy(\"device\")</code>) and perform operations such as\n<code>timeWindow()</code> and <code>sum()</code> on them.</p>\n<p>After the processing, the results are output to the console:</p>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-scala\">processedDataStream.print().setParallelism(1)\n</code></pre>\n","event":[{"listen":"prerequest","script":{"id":"67d9b6c0-a360-46fb-8d29-2d7ab406d0c9","type":"text/javascript","exec":[""]}},{"listen":"test","script":{"id":"8b6a685f-b206-49a5-a531-64cf43d56258","type":"text/javascript","exec":[""]}}],"_postman_id":"28d282bc-5cf6-47b1-bcbe-5d7f058f19ed"},{"name":"Receiving context data, performing operations and persisting context data","item":[{"name":"Orion - Subscribe to Context Changes","id":"e0733f7a-8b9b-413d-98cf-bb970c2b8334","protocolProfileBehavior":{"disableBodyPruning":true},"request":{"method":"POST","header":[{"key":"Content-Type","value":"application/ld+json"},{"key":"NGSILD-Tenant","value":"openiot"}],"body":{"mode":"raw","raw":"{\n  \"description\": \"Notify Flink of changes of Soil Humidity\",\n  \"type\": \"Subscription\",\n  \"entities\": [{\"type\": \"SoilSensor\"}],\n  \"watchedAttributes\": [\"humidity\"],\n  \"notification\": {\n    \"attributes\": [\"humidity\"],\n    \"format\": \"normalized\",\n    \"endpoint\": {\n      \"uri\": \"http://flink-taskmanager:9001\",\n      \"accept\": \"application/json\"\n    }\n  },\n   \"@context\": \"http://context-provider:3000/data-models/ngsi-context.jsonld\"\n}"},"url":"http://localhost:1026/ngsi-ld/v1/subscriptions/","description":"<p>A new subscription needs to be set up to run this example. The subscription is listening to changes of context on the soil humidity sensor</p>\n","urlObject":{"protocol":"http","path":["ngsi-ld","v1","subscriptions",""],"host":["localhost:1026"],"query":[],"variable":[]}},"response":[],"_postman_id":"e0733f7a-8b9b-413d-98cf-bb970c2b8334"},{"name":"Orion - Check Subscription is working","id":"4024ffe3-6b1e-4cc7-82eb-8e7d71f99af0","protocolProfileBehavior":{"disableBodyPruning":true},"request":{"method":"GET","header":[{"key":"NGSILD-Tenant","value":"openiot"}],"url":"http://localhost:1026/ngsi-ld/v1/subscriptions/","description":"<p>If a subscription has been created, we can check to see if it is firing by making a GET request to the\n<code>/ngsi-ld/v1/subscriptions/</code> endpoint.</p>\n","urlObject":{"protocol":"http","path":["ngsi-ld","v1","subscriptions",""],"host":["localhost:1026"],"query":[],"variable":[]}},"response":[],"_postman_id":"4024ffe3-6b1e-4cc7-82eb-8e7d71f99af0"},{"name":"Orion - Delete Subscription","id":"c75dd55e-5600-49dc-bb03-ef6054a81140","protocolProfileBehavior":{"disableBodyPruning":true},"request":{"method":"DELETE","header":[{"key":"NGSILD-Tenant","value":"openiot"}],"body":{"mode":"raw","raw":""},"url":"http://localhost:1026/ngsi-ld/v1/subscriptions/5e134a0c924f6d7d27b63844","description":"<p>If necessary an old subscription can be deleted by referencing its subscription id.</p>\n","urlObject":{"protocol":"http","path":["ngsi-ld","v1","subscriptions","5e134a0c924f6d7d27b63844"],"host":["localhost:1026"],"query":[],"variable":[]}},"response":[],"_postman_id":"c75dd55e-5600-49dc-bb03-ef6054a81140"}],"id":"76a38925-f64d-45e7-b958-e3070cea86fe","description":"<p>The second example turns on a water faucet when the soil humidity is too low and turns it back off it when the soil humidity it is back to normal levels. This way, the soil humidity is always kept at an adequate level.</p>\n<p>The dataflow stream uses the <code>NGSILDSource</code> operator in order to receive notifications and filters the input to only respond to motion senseors and then uses the <code>NGSILDSink</code> to push processed context back to the Context Broker. You can find the source code of the example in\n<a href=\"https://github.com/FIWARE/tutorials.Big-Data-Flink/blob/master/cosmos-examples/src/main/scala/org/fiware/cosmos/tutorial/FeedbackLD.scala\">org/fiware/cosmos/tutorial/FeedbackLD.scala</a></p>\n<h3 id=\"feedback-loop---installing-the-jar\">Feedback Loop - Installing the JAR</h3>\n<p>Goto <code>http://localhost:8081/#/job/running</code></p>\n<p><img src=\"https://fiware.github.io/tutorials.Big-Data-Flink/img/running-jobs.png\" alt /></p>\n<p>Select the running job (if any) and click on <strong>Cancel Job</strong></p>\n<p>Thereafter goto <code>http://localhost:8081/#/submit</code></p>\n<p><img src=\"https://fiware.github.io/tutorials.Big-Data-Flink/img/submit-feedback.png\" alt /></p>\n<p>Submit new job</p>\n<ul>\n<li><strong>Filename:</strong> <code>cosmos-examples-1.2.jar</code></li>\n<li><strong>Entry Class:</strong> <code>org.fiware.cosmos.tutorial.FeedbackLD</code></li>\n</ul>\n<h3 id=\"feedback-loop---checking-the-output\">Feedback Loop - Checking the Output</h3>\n<p>Go to <code>http://localhost:3000/device/monitor</code></p>\n<p>Raise the temperature in Farm001 until the humidity value is below 35, then the water faucet will be automatically turned on to increase the soil humidity. When the temperature raises above 50, the water faucet will be turned off automatically as well.</p>\n<h3 id=\"feedback-loop---analyzing-the-code\">Feedback Loop - Analyzing the Code</h3>\n<pre class=\"click-to-expand-wrapper is-snippet-wrapper\"><code class=\"language-scala\">package org.fiware.cosmos.tutorial\n\n\nimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}\nimport org.fiware.cosmos.orion.flink.connector._\n\nobject FeedbackLD {\n  final val CONTENT_TYPE = ContentType.JSON\n  final val METHOD = HTTPMethod.PATCH\n  final val CONTENT = \"{\\n  \\\"type\\\" : \\\"Property\\\",\\n  \\\"value\\\" : \\\" \\\" \\n}\"\n  final val HEADERS = Map(\n    \"NGSILD-Tenant\" -&gt; \"openiot\",\n    \"Link\" -&gt; \"&lt;http://context-provider:3000/data-models/ngsi-context.jsonld&gt;; rel=\\\"http://www.w3.org/ns/json-ld#context\\\"; type=\\\"application/ld+json\\\"\"\n  )\n  final val LOW_THRESHOLD = 35\n  final val HIGH_THRESHOLD = 50\n\n  def main(args: Array[String]): Unit = {\n    val env = StreamExecutionEnvironment.getExecutionEnvironment\n    // Create Orion Source. Receive notifications on port 9001\n    val eventStream = env.addSource(new NGSILDSource(9001))\n    // Process event stream\n    val processedDataStream = eventStream.flatMap(event =&gt; event.entities)\n      .filter(ent =&gt; ent.`type` == \"SoilSensor\")\n\n    /* High humidity */\n\n    val highHumidity = processedDataStream\n      .filter(ent =&gt;  (ent.attrs(\"humidity\") != null) &amp;&amp; (ent.attrs(\"humidity\")(\"value\").asInstanceOf[BigInt] &gt; HIGH_THRESHOLD))\n      .map(ent =&gt; (ent.id,ent.attrs(\"humidity\")(\"value\")))\n\n    val highSinkStream= highHumidity.map(sensor =&gt; {\n      OrionSinkObject(CONTENT,\"http://orion:1026/ngsi-ld/v1/entities/urn:ngsi-ld:Device:water\"+sensor._1.takeRight(3)+\"/attrs/off\",CONTENT_TYPE,METHOD,HEADERS)\n    })\n\n    highHumidity.map(sensor =&gt; \"Sensor\" + sensor._1 + \" has detected a humidity level above \" + HIGH_THRESHOLD + \". Turning off water faucet!\").print()\n    OrionSink.addSink( highSinkStream )\n\n\n    /* Low humidity */\n    val lowHumidity = processedDataStream\n      .filter(ent =&gt; (ent.attrs(\"humidity\") != null) &amp;&amp; (ent.attrs(\"humidity\")(\"value\").asInstanceOf[BigInt] &lt; LOW_THRESHOLD))\n      .map(ent =&gt; (ent.id,ent.attrs(\"humidity\")(\"value\")))\n\n    val lowSinkStream= lowHumidity.map(sensor =&gt; {\n      OrionSinkObject(CONTENT,\"http://orion:1026/ngsi-ld/v1/entities/urn:ngsi-ld:Device:water\"+sensor._1.takeRight(3)+\"/attrs/on\",CONTENT_TYPE,METHOD,HEADERS)\n    })\n\n    lowHumidity.map(sensor =&gt; \"Sensor\" + sensor._1 + \" has detected a humidity level below \" + LOW_THRESHOLD + \". Turning on water faucet!\").print()\n    OrionSink.addSink( lowSinkStream )\n\n    env.execute(\"Socket Window NgsiEvent\")\n  }\n\n  case class Sensor(id: String)\n}\n</code></pre>\n<p>As you can see, it is similar to the previous example. The main difference is that it writes the processed data back in the Context Broker through the <strong><code>OrionSink</code></strong>.</p>\n<p>The arguments of the <strong><code>OrionSinkObject</code></strong> are:</p>\n<ul>\n<li><strong>Message</strong>: <code>\"{\\n  \\\"type\\\" : \\\"Property\\\",\\n  \\\"value\\\" : \\\" \\\" \\n}\"</code>.</li>\n<li><strong>URL</strong>: <code>\"http://orion:1026/ngsi-ld/v1/entities/urn:ngsi-ld:Device:water\"+sensor._1.takeRight(3)+\"/attrs/on\"</code> or <code>\"http://orion:1026/ngsi-ld/v1/entities/urn:ngsi-ld:Device:water\"+sensor._1.takeRight(3)+\"/attrs/off\"</code>, depending on whether we are turning on or off the water faucet. TakeRight(3) gets the number of\nthe sensor, for example '001'.</li>\n<li><strong>Content Type</strong>: <code>ContentType.JSON</code>.</li>\n<li><strong>HTTP Method</strong>: <code>HTTPMethod.PATCH</code>.</li>\n<li><strong>Headers</strong>: <code>Map(\"NGSILD-Tenant\" -&gt; \"openiot\", \"Link\" -&gt; \"&lt;http://context-provider:3000/data-models/ngsi-context.jsonld&gt;; rel=\\\"http://www.w3.org/ns/json-ld#context\\\"; type=\\\"application/ld+json\\\"\" )</code>.\nWe add the headers we need in the HTTP Request.</li>\n</ul>\n","event":[{"listen":"prerequest","script":{"id":"f0d07caf-4bb9-4b60-b5bb-541cd2ba243a","type":"text/javascript","exec":[""]}},{"listen":"test","script":{"id":"301e962f-c9c0-4715-86dc-7de3ea6884b4","type":"text/javascript","exec":[""]}}],"_postman_id":"76a38925-f64d-45e7-b958-e3070cea86fe"}],"event":[{"listen":"prerequest","script":{"id":"08f48b68-8ba3-42af-b2f1-cb0d0718bf10","type":"text/javascript","exec":[""]}},{"listen":"test","script":{"id":"1546c666-be89-4bfa-8b60-9f22444651f0","type":"text/javascript","exec":[""]}}],"variable":[{"key":"orion","value":"localhost:1026"},{"key":"subscriptionId","value":"5e134a0c924f6d7d27b63844"}]}