Run a Stream Application Locally

Objectives: Set up and run a stream application locally that reads streaming messages and writes them to a versioned layer.

Complexity: Easy

Time to complete: 30 min

Depends on: Organize your work in projects

Source code: Download

This example demonstrates how to set up and run a local Flink application that reads streaming messages from an input catalog. The Flink Application writes the message content into the versioned layer of the catalog that you created in the Organize your work in projects example.

Set up the Maven Project

Create the following folder structure for the project:

archive-stream
└── src
    └── main
        ├── java
        └── resources
        └── scala

You can do this with a single bash command:

mkdir -p archive-stream/src/main/{java,resources,scala}

Create a file named pipeline-config.conf, and populate it with the contents below, replacing {{YOUR_CATALOG_HRN}} with the HRN to the catalog you created in Organize your work in projects.

pipeline.config {

  // Replace this with the HRN to your catalog.
  output-catalog {hrn = "{{YOUR_CATALOG_HRN}}"}

  input-catalogs {
    //Please, use hrn:here-cn:data::olp-cn-here:sample-data on China Environment
    sensorData {hrn = "hrn:here:data::olp-here:olp-sdii-sample-berlin-2"}
  }
}

We will use SDII Sensor Data Sample Catalog as an input data catalog, so we need to link it to the project. To do this, replace {{YOUR_PROJECT_HRN}} with the HRN of your project in the following command and execute it:


olp project resources link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:olp-sdii-sample-berlin-2

The CLI should return the following message:


Project resource hrn:here:data::olp-here:olp-sdii-sample-berlin-2 has been linked.

The POM for this example is identical to that in the first Maven example, except for its parent and dependencies section:

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-stream-bom</artifactId>
    <version>2.19.10</version>
    <relativePath/>
</parent>

Scala
Java
<dependencies>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>flink-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
</dependencies>
<dependencies>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>flink-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
</dependencies>

Implement the Stream Reading Application

The respective Scala and Java implementations, which use the flink-support module of the Data Client Library to receive streaming messages from a sample streaming layer and to copy their contents into an output versioned layer, are as follows:

Scala
Java
/*
 * Copyright (c) 2018-2020 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN
import com.here.platform.data.client.flink.scaladsl.FlinkReadEngine
import com.here.platform.data.client.flink.scaladsl.{
  FlinkDataClient,
  FlinkPublishApi,
  FlinkWriteEngine
}
import com.here.platform.data.client.model.VersionDependency
import com.here.platform.data.client.scaladsl.{CommitPartition, NewPartition, Partition}
import com.here.platform.data.client.settings.{ConsumerSettings, LatestOffset}
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.RichAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object ArchiveStreamScala extends App {

  // Uploads partitions to blobstore.
  class UploadDataFunction(val inputHRN: HRN, val outputHRN: HRN)
      extends RichMapFunction[Partition, CommitPartition]
      with Serializable {

    @transient
    private lazy val flinkDataClient: FlinkDataClient =
      new FlinkDataClient()

    override def close(): Unit =
      flinkDataClient.terminate()

    @transient
    private lazy val readEngine: FlinkReadEngine =
      flinkDataClient.readEngine(inputHRN)

    @transient
    private lazy val writeApi: FlinkWriteEngine =
      flinkDataClient.writeEngine(outputHRN)

    def map(partition: Partition): CommitPartition = {

      val data = readEngine.getDataAsBytes(partition)

      val newPartition = NewPartition(
        partition = partition.partition,
        layer = "sdii-message-archive",
        data = NewPartition.ByteArrayData(data)
      )
      writeApi.put(newPartition)
    }
  }

  // Window function that publishes a new batch version for all CommitPartitions in the window.
  class PublishBatchWindowFunction(hrn: HRN)
      extends RichAllWindowFunction[CommitPartition, String, TimeWindow]
      with Serializable {
    @transient
    private lazy val flinkDataClient: FlinkDataClient =
      new FlinkDataClient()

    @transient
    private lazy val publishApi: FlinkPublishApi =
      flinkDataClient.publishApi(hrn)

    override def close(): Unit =
      flinkDataClient.terminate()

    override def apply(window: TimeWindow,
                       partitions: Iterable[CommitPartition],
                       out: Collector[String]): Unit = {
      val baseVersion = publishApi.getBaseVersion()

      // Note that we do not log any versioned dependencies, because we are only processing streaming input.
      publishApi.publishBatch(baseVersion, Seq.empty[VersionDependency], partitions.iterator)
      out.collect(s"commit on $baseVersion success")
    }
  }

  override def main(args: Array[String]): Unit = {

    val pipelineContext = new PipelineContext
    val sensorDataHRN = pipelineContext.config.inputCatalogs("sensorData")
    val outputHRN = pipelineContext.config.outputCatalog

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

    val flinkDataClient = new FlinkDataClient
    val queryApi = flinkDataClient.queryApi(sensorDataHRN)

    val streamingLayer = "sample-streaming-layer"

    val subscriptionFunction = queryApi.subscribe(
      streamingLayer,
      ConsumerSettings(groupName = "archive-stream-consumer", offset = LatestOffset))

    // Subscribe to streaming sensor data messages.
    val partitions: DataStream[Partition] = env.addSource(subscriptionFunction)

    try {
      partitions

      // Upload incoming streaming sensor data messages to the blobstore for the versioned layer.
        .map(new UploadDataFunction(sensorDataHRN, outputHRN))

        // Publishes a new layer version once per minute.
        .windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
        .apply(new PublishBatchWindowFunction(outputHRN))
        .addSink(_ => {
          /// [halt-processing]
          // If you comment out this line, the application will continue to publish a new version
          // to the output catalog once per minute, until you explicitly kill the application if
          // running locally, or cancel the pipeline version if running on a pipeline.
          throw new InterruptedException("Halting stream processing")
          /// [halt-processing]
        })

      env.execute()
    } catch {
      case ex: Exception => ()
    }

    subscriptionFunction.cancel()
    flinkDataClient.terminate()
  }
}

/*
 * Copyright (c) 2018-2020 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.platform.data.client.flink.javadsl.*;
import com.here.platform.data.client.javadsl.CommitPartition;
import com.here.platform.data.client.javadsl.NewPartition;
import com.here.platform.data.client.javadsl.Partition;
import com.here.platform.data.client.settings.ConsumerSettings;
import com.here.platform.pipeline.PipelineContext;
import java.io.Serializable;
import java.util.*;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

// Uploads partitions to blobstore.
class UploadDataFunction extends RichMapFunction<Partition, CommitPartition>
    implements Serializable {
  private HRN sensorDataHRN;
  private HRN outputHRN;
  private transient FlinkDataClient dataClient;
  private transient FlinkReadEngine readEngine;
  private transient FlinkWriteEngine writeEngine;

  public UploadDataFunction(HRN sensorDataHRN, HRN outputHRN) {
    this.sensorDataHRN = sensorDataHRN;
    this.outputHRN = outputHRN;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    dataClient = new FlinkDataClient();
    readEngine = dataClient.readEngine(sensorDataHRN);
    writeEngine = dataClient.writeEngine(outputHRN);
  }

  @Override
  public void close() throws Exception {
    dataClient.terminate();
  }

  @Override
  public CommitPartition map(Partition partition) throws Exception {
    byte[] data = readEngine.getDataAsBytes(partition);
    NewPartition newPartition =
        new NewPartition.Builder()
            .withPartition(partition.getPartition())
            .withLayer("sdii-message-archive")
            .withData(data)
            .build();
    return writeEngine.put(newPartition);
  }
}

// Window function that publishes a new batch version for all CommitPartitions in the window.
class PublishBatchWindowFunction extends RichAllWindowFunction<CommitPartition, String, TimeWindow>
    implements Serializable {
  private HRN hrn;
  private transient FlinkDataClient dataClient;
  private transient FlinkPublishApi publishApi;

  public PublishBatchWindowFunction(HRN hrn) {
    this.hrn = hrn;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    dataClient = new FlinkDataClient();
    publishApi = dataClient.publishApi(hrn);
  }

  @Override
  public void close() throws Exception {
    dataClient.terminate();
  }

  @Override
  public void apply(
      TimeWindow window, Iterable<CommitPartition> commitPartitions, Collector<String> out)
      throws Exception {
    OptionalLong baseVersion = publishApi.getBaseVersion();

    // Note that we do not log any versioned dependencies, because we are only processing streaming
    // input.
    publishApi.publishBatch(baseVersion, Collections.emptyList(), commitPartitions.iterator());
    out.collect("commit on " + baseVersion + " success");
  }
}

public class ArchiveStreamJava {

  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

    PipelineContext pipelineContext = new PipelineContext();
    HRN sensorDataHRN = pipelineContext.getConfig().getInputCatalogs().get("sensorData");
    HRN outputHRN = pipelineContext.getConfig().getOutputCatalog();

    String streamingLayer = "sample-streaming-layer";

    FlinkDataClient flinkDataClient = new FlinkDataClient();

    FlinkQueryApi queryApi = flinkDataClient.queryApi(sensorDataHRN);

    ConsumerSettings consumerSettings =
        new ConsumerSettings.Builder()
            .withLatestOffset()
            .withGroupName("archive-stream-consumer")
            .build();

    SourceFunction<Partition> subscriptionFunction =
        queryApi.subscribe(streamingLayer, consumerSettings);

    // Subscribe to streaming sensor data messages.
    DataStream<Partition> partitions = env.addSource(subscriptionFunction);

    try {
      partitions
          // Upload incoming streaming sensor data messages to the blobstore for the versioned
          // layer.
          .map(new UploadDataFunction(sensorDataHRN, outputHRN))

          // Publishes a new layer version once per minute.
          .windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
          .apply(new PublishBatchWindowFunction(outputHRN))
          .addSink(
              new SinkFunction<String>() {
                @Override
                public void invoke(String value, Context context) throws Exception {
                  /// [halt-processing]
                  // If you comment out this line, the application will continue to publish a new
                  // version to the output catalog once per minute, until you explicitly kill
                  // the application if running locally, or cancel the pipeline version if
                  // running on a pipeline.
                  throw new InterruptedException("Halting stream processing");
                  /// [halt-processing]
                }
              });

      env.execute();
    } catch (Exception ex) {
    }

    subscriptionFunction.cancel();
    flinkDataClient.terminate();
  }
}

You can place your configuration files in the resources folder to control, for example, the logging level of the application's output and the size of the request queues for the Data Services.

This resources/application.conf file forwards logging events from akka to an slf4j logger and increases the size of the request queues for the config and streaming services:

akka {
  // This is the minimum log level for events forwarded by akka to the slf4j logger.
  // The slf4j backend configuration can define a higher log level in its configuration (for instance, in log4j.properties).
  loglevel = "DEBUG"
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  logger-startup-timeout = 1m
}

here.platform.data-client {
  // Raises the number of open requests for the queryApi on the source catalog.
  config {
    request-executor {
      akka.http.host-connection-pool {
        max-open-requests = 16
        max-connections = 16
      }
    }
  }
  // The size of the request queue for the streaming Data Service can be configured here.
  stream {
    request-executor {
      akka.http.host-connection-pool {
        max-open-requests = 256
        max-connections = 256
      }
    }
    connector {
      consumer = "kafka-connector"
    }
  }
}

You can configure the logging levels exposed in this resources/log4j.properties file:

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{1} - %m%n

# Logging levels (DEBUG, INFO, WARN, ERROR) can be configured here.
log4j.logger.akka.http=DEBUG
log4j.logger.com.here.platform.data.client=INFO

Compile and Run Locally

To run the application locally, execute the following command:

Run Scala
Run Java

mvn compile exec:java -Dexec.mainClass=ArchiveStreamScala -Dpipeline-config.file=pipeline-config.conf \
        -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
        

mvn compile exec:java -Dexec.mainClass=ArchiveStreamJava -Dpipeline-config.file=pipeline-config.conf \
        -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
        

The application publishes a new version of the output catalog after uploading partitions to the catalog for one minute. You can inspect the output at the platform portal. To do this, navigate in the portal, find {{YOUR_CATALOG_HRN}} in the list and open this catalog. Select the sdii-message-archive layer to inspect its data.

Manually refresh the page to update the Versions: field once the app has published the new version. Next, go the Inspect tab and select a tile to visualize it on the map and view its decoded data in the right-hand panel.

Note

To keep this tutorial focused on the raw mechanics of interacting with the Flink Data Client, the sample code here is a simpler implementation, but not illustrative of a real-world use case.

A normal production application could also decode the content of the streaming messages in order to bucket them into useful partitions. For example, the coordinates of an event within a message could be correlated to a specific HERE tile on a map.

Further, a typical application to archive streaming messages would use the Data Archiving Library to process their content and write the results to an index layer. See "Further Information" below for more on archiving.

As implemented, the application deliberately throws an exception after successfully publishing the new version to prevent it from continuing to process streaming input indefinitely. To see how this application behaves in a real production case, comment out the line that throws the exception. In this case, the application continually publishes a new version of the output catalog once per minute until you manually cancel the pipeline to stop the stream processing.

Scala
Java
// If you comment out this line, the application will continue to publish a new version
// to the output catalog once per minute, until you explicitly kill the application if
// running locally, or cancel the pipeline version if running on a pipeline.
throw new InterruptedException("Halting stream processing")
// If you comment out this line, the application will continue to publish a new
// version to the output catalog once per minute, until you explicitly kill
// the application if running locally, or cancel the pipeline version if
// running on a pipeline.
throw new InterruptedException("Halting stream processing");

Further Information

You can learn how to run this streaming application with Pipeline API in Run a Stream Application with Pipeline API.

You can learn how to archive steaming messages into index layers with the Data Archiving Library.

results matching ""

    No results matching ""