Run a Stream Application with Pipeline API

Objectives: Set up and run a stream application with the Pipeline API. The application forwards streaming messages to a versioned layer.

Complexity: Easy

Time to complete: 30 min

Depends on: Run a Stream Application Locally, Organize your work in projects.

This example demonstrates how to set up and run a Flink application that reads streaming messages from an input catalog. The Flink application writes the message content into the versioned layer of the catalog, which you created in the Organize your work in projects example. Whereas you ran the application locally in Run a Stream Application Locally, this example runs the application with Pipeline API.

Build the Fat JAR

The source code that you run locally is identical with the code that you run in the Workspace. You can reuse the same project that you used in Run a Stream Application Locally.

Build the fat JAR for your application to upload and run in the Workspace.

mvn -Pplatform clean package

This assembles the fat JAR using the platform profile defined via the sdk-stream-bom parent POM. This profile contains the Flink-specific dependencies and their canonical versions for the HERE Workspace, and also the rules with which to resolve version conflicts.

Create, Activate, and Delete Pipelines

Create a file named pipeline-config.conf, and populate it with the contents below, replacing {{YOUR_CATALOG_HRN}} with the HRN to the catalog you created in Organize your work in projects. You also used this catalog in Run a Stream Application Locally).

pipeline.config {

  // Replace this with the HRN to your catalog.
  output-catalog {hrn = "{{YOUR_CATALOG_HRN}}"}

  input-catalogs {
    //Please, use hrn:here-cn:data::olp-cn-here:sample-data on China Environment
    sensorData {hrn = "hrn:here:data::olp-here:olp-sdii-sample-berlin-2"}
  }
}

If you have not already done so, download and unzip the Java and Scala examples and the CLI. You can run OLP CLI from the tools/OLP_CLI folder of the unzipped file.

In this tutorial, a public catalog is used - SDII Sensor Data Sample Catalog. First, you should link the catalog to your project. To do this, replace {{YOUR_PROJECT_HRN}} with your project HRN and execute the following command:


olp project resources link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:olp-sdii-sample-berlin-2

The CLI should return the following message:


Project resource hrn:here:data::olp-here:olp-sdii-sample-berlin-2 has been linked.

Using {{YOUR_PROJECT_HRN}} from the Organize your work in projects tutorial, create a pipeline:

olp pipeline create {{YOUR_USERNAME}}-archivestream --scope {{YOUR_PROJECT_HRN}}

The CLI returns your {{YOUR_PIPELINE_ID}}:

Pipeline has been created.
ID: 3e1e2d82-ee41-4625-a67e-09f7355063ee

Next, create a pipeline template, setting --scope equal to {{YOUR_PROJECT_HRN}}:

Scala
Java

olp pipeline template create {{YOUR_USERNAME}}-archivestream-template \
    stream-3.0.0 {{PATH_TO_YOUR_FAT_JAR}} \
    ArchiveStreamScala \
    --input-catalog-ids pipeline-config.conf \
    --scope {{YOUR_PROJECT_HRN}}


olp pipeline template create {{YOUR_USERNAME}}-archivestream-template \
    stream-3.0.0 {{PATH_TO_YOUR_FAT_JAR}} \
    ArchiveStreamJava \
    --input-catalog-ids pipeline-config.conf \
    --scope {{YOUR_PROJECT_HRN}}

The CLI returns {{YOUR_PIPELINE_TEMPLATE_ID}} such as:

Pipeline template has been created.
ID: 2a74fe67-6d8e-4f19-b168-cbeac5b53d83

You can now create a pipeline version with this conf file:

olp pipeline version create {{YOUR_USERNAME}}-archivestream-version \
    {{YOUR_PIPELINE_ID}} \
    {{YOUR_PIPELINE_TEMPLATE_ID}} \
    ./pipeline-config.conf \
    --scope {{YOUR_PROJECT_HRN}}

The CLI returns {{YOUR_PIPELINE_VERSION_ID}}:

Pipeline version has been created
ID: e7a2028d-2606-4c42-8300-69cb18fdddbe

To run the pipeline, activate it:

olp pipeline version activate {{YOUR_PIPELINE_ID}} {{YOUR_PIPELINE_VERSION_ID}} --scope {{YOUR_PROJECT_HRN}}

The CLI indicates successful activation:

Pipeline version has been activated
Current state: scheduled

Once it is running, the pipeline publishes a new version of the output catalog after uploading partitions to it for one minute. You can inspect the output in the HERE portal with the Data Inspector tool. To do this, navigate to the portal, find {{YOUR_CATALOG_HRN}} in the list and open this catalog. Select the sdii-message-archive layer to inspect it.

Manually refresh the page to update the Versions: field once the app has published the new version. Next, go the Inspect tab and select a tile to visualize it on the map and view its decoded data in the right-hand panel.

As implemented, the application deliberately throws an exception once the new version has been successfully published, so that it does not continue processing streaming input indefinitely. Here is a CLI command that you can use to wait until the pipeline application reaches its expected failed state from this halting exception:

olp pipeline version wait {{YOUR_PIPELINE_ID}} {{YOUR_PIPELINE_VERSION_ID}} \
    --job-state failed --timeout 1200 --scope {{YOUR_PROJECT_HRN}}

Once the pipeline terminates, the CLI lists the output:

Details of the ae43cc9c-8aa9-42d5-9fc0-7f858a02dd48 job:
created                                 2018-06-28T09:45:33.305Z
updated                                 2018-06-28T09:51:16.064Z
state                                   failed
logging URL                             ...

If you want to see how this application would behave in a real production case, you can comment out the line that throws the exception. In this case, the application continually publishes a new version of the output catalog once per minute until you manually cancel the pipeline to stop the stream processing.

Scala
Java
// If you comment out this line, the application will continue to publish a new version
// to the output catalog once per minute, until you explicitly kill the application if
// running locally, or cancel the pipeline version if running on a pipeline.
throw new InterruptedException("Halting stream processing")
// If you comment out this line, the application will continue to publish a new
// version to the output catalog once per minute, until you explicitly kill
// the application if running locally, or cancel the pipeline version if
// running on a pipeline.
throw new InterruptedException("Halting stream processing");

If you have jq installed, you can process the json returned by OLP CLI to get the logging URL for the running application. For more details on using jq with OLP CLI in your scripts, see Scripting Tips.

The following command obtains a URL under https://splunk.metrics.platform.here.com/, at which you can inspect and run queries over the logs of your application.

olp pipeline version list {{YOUR_PIPELINE_ID}} --json --scope {{YOUR_PROJECT_HRN}} \
    | jq -r '.pipelineVersionapplications[] | select(.state | contains("running")) | .loggingUrl'

You can also manage your pipeline from the platform portal. To do this, navigate to the portal and select {{YOUR_PIPELINE_ID}} to open its details page.

If you commented out the halting exception above, it is important that you manually cancel the pipeline in order to stop the stream processing, or else the pipeline continues to upload and publish new partitions to your output catalog. Use the following command for the CLI:

olp pipeline version cancel {{YOUR_PIPELINE_ID}} {{YOUR_PIPELINE_VERSION_ID}} --scope {{YOUR_PROJECT_HRN}}

If your command is successful, the CLI indicates the following:

Pipeline version has been canceled.
State: ready

The pipeline, pipeline template, and pipeline version remain available for reuse. Once you have finished using and no longer intend to reuse them, you should delete them as follows.

olp pipeline version delete {{YOUR_PIPELINE_ID}} {{YOUR_PIPELINE_VERSION_ID}} --scope {{YOUR_PROJECT_HRN}}

If your command is successful, the CLI returns as follows:

Pipeline version {{YOUR_PIPELINE_VERSION_ID}} has been deleted.

Delete the pipeline:

olp pipeline delete {{YOUR_PIPELINE_ID}} --scope {{YOUR_PROJECT_HRN}}

The CLI indicates the following:

Pipeline with the id {{YOUR_PIPELINE_ID}} has been deleted.

Delete the pipeline template:

olp pipeline template delete {{YOUR_PIPELINE_TEMPLATE_ID}} --scope {{YOUR_PROJECT_HRN}}

The CLI indicates the following:

Pipeline template {{YOUR_PIPELINE_TEMPLATE_ID}} has been deleted.

Further Information

For more details on Pipeline API via OLP CLI, see the Command Line Interface Developer Guide.

results matching ""

    No results matching ""