Build a Batch Pipeline with Maven Archetypes (Scala)

To build a Batch Pipeline using the Data Processing Library, we use the SDK Maven Archetypes to create a skeleton for the project. The HERE platform portal is used to manage credentials, to create a catalog and manage access rights. The SDK Maven Archetypes is used to create a skeleton of the project.

In this example, we want to create a pipeline that reads the Road Topology & Geometry layer in the HERE Map Content catalog and then writes the number of segment references (cardinality) for every topology node in each input partition.

Credentials

There are two types of credentials you require:

  1. Platform Credentials - These credentials provide access to the platform API. First, create a new application at https://platform.here.com/profile/access-credentials. Once the application is created, click on Create a key to download the credentials. By default, Data Processing Library looks for credentials in the $HOME/.here/credentials.properties file. Make sure your credentials file is placed in this location.
  2. Repository Credentials - These credentials enable you to access the repository where the Data Processing Library is. Go to https://platform.here.com/profile/repository and click on Generate Credentials). This downloads the settings.xml file. Copy this file to the $HOME/.m2/ folder.

Create the Output Catalog

First we create a new catalog to serve as the output catalog for our pipeline. The catalog has one layer where, for each partition of the Road Topology & Geometry layer, there is a partition containing the cardinalities of the topology nodes in that partition. We also need one additional layer, state, which is reserved for the Data Processing Library.

Log into the HERE platform and select the Data tab. Carry out the following steps:

  1. Click Add new catalog.
  2. Specify a CATALOG NAME, and a CATALOG ID for your catalog, such as batch-processing-quickstart-username.
  3. Next, add a CATALOG SUMMARY and a CONTACT EMAIL.
  4. Click Save and wait for the Data API to create your new catalog.

Then, we give our application read/write access to the catalog, as follows:

  1. Select your catalog by searching for its name in the Search for data box.
  2. Go to Sharing, and in Manage Sharing select SHARE CATALOG WITH App. Insert your application ID, click on Grant and check read and write.
  3. Finally, click on Grant to enable your changes.

We now have to add layers to the catalog:

  1. Click on Add new layer and create a layer with node-cardinality as its ID. You can use node-cardinality as the layer's name too, or choose a different, human readable name.
  2. We require a HERE Tile layer, and the zoom level must be the same as the input Road Topology & Geometry layer, 12. Select Versioned for Layer Type, which you must use for every layer processed by a batch pipeline.
  3. Keep the default Content Type of application/x-protobuf since we want to use Protocol Buffers to encode our partitions. Leave the Schema field set to None.
  4. Then, click Save to complete the layer creation.
  5. Proceed with a second layer, state, and configure it according to the second row of the following table, which lists the configuration of all layers in the catalog.
Layer ID Partitioning Zoom Level Layer Type Content Type Schema
node-cardinality HERE Tile 12 Versioned application/x-protobuf None
state Generic N/A Versioned application/octet-stream None

The catalog is now fully configured and we can proceed with creating the project.

Create a Project

The Data SDK includes Maven Archetypes to simplify the process of creating new batch pipelines. Using the Maven Archetypes, we can build a complete project structure using a few shell commands. The archetype automatically generates POM files that include all of the basic dependencies, sample configuration files, and source files we can edit to implement our own logic. We need to create at least three projects:

  • a top-level project, for convenience, to compile all sub-projects with a single POM file
  • a nested Schema project, to build Java/Scala bindings for our Protocol Buffers schema
  • a Process project, to build the processing logic

The following steps assume that Maven is installed and the mvn executable is in your PATH variable. You must run all of the commands below from a bash shell. The tree command is used to show the folder structures. Alternatively, you can use ls -R as a replacement.

First, create a top-level project named nodecardinality by running the following command, press ENTER to confirm:

$ pwd
~/projects

$ mvn archetype:generate -DarchetypeGroupId=org.codehaus.mojo.archetypes \
                         -DarchetypeArtifactId=pom-root \
                         -DarchetypeVersion=1.1 \
                         -DgroupId=com.example \
                         -DartifactId=nodecardinality \
                         -Dversion=1.0.0 \
                         -Dpackage=com.example.nodecardinality

This creates a nodecardinality folder in the current directory, containing the following files:

$ pwd
~/projects

$ tree
.
`-- nodecardinality
    `-- pom.xml

1 directory, 1 file

Sub-projects are created from within this folder. Navigate to the nodecardinality folder to create the sub-projects. First create a Model project by running the following command, press ENTER to confirm:

$ pwd
~/projects/nodecardinality

$ mvn archetype:generate -DarchetypeGroupId=com.here.platform.schema \
                         -DarchetypeArtifactId=project_archetype \
                         -DarchetypeVersion=X.Y.Z \
                         -DgroupId=com.example.nodecardinality \
                         -DartifactId=schema \
                         -Dversion=1.0.0 \
                         -Dpackage=com.example.nodecardinality.schema \
                         -DmajorVersion=0

Refer to the Archetypes Developer Guide for specific documentation about the latest version of the archetype included in the SDK.

This creates a project template in the nodecardinality/schema folder containing a project to build the schema for our output catalog:

$ pwd
~/projects/nodecardinality

$ tree
.
|-- pom.xml
`-- schema
    |-- ds
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           `-- resources
    |               |-- ResourcesReadMe.txt
    |               `-- renderers
    |                   `-- ReadMe.txt
    |-- java
    |   `-- pom.xml
    |-- pom.xml
    |-- proto
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           |-- proto
    |           |   `-- com
    |           |       `-- example
    |           |           `-- nodecardinality
    |           |               `-- schema
    |           |                   `-- v0
    |           |                       `-- schema.proto
    |           `-- resources
    |               `-- description.md
    |-- scala
    |   `-- pom.xml
    `-- schema.yml

20 directories, 13 files

Finally, still within the nodecardinality folder, run the following command to create a processor template including a Direct1ToNCompiler, press ENTER to confirm:

$ pwd
~/projects/nodecardinality

$ mvn archetype:generate -DarchetypeGroupId=com.here.platform \
                         -DarchetypeArtifactId=batch-direct1ton-scala-archetype \
                         -DarchetypeVersion=X.Y.Z \
                         -DgroupId=com.example.nodecardinality \
                         -DartifactId=processor \
                         -Dversion=1.0.0 \
                         -Dpackage=com.example.nodecardinality.processor

Refer to the Archetypes Developer Guide for specific documentation about the latest version of the archetype included in the SDK.

An additional processor folder is now added to the nodecardinality project:

$ pwd
~/projects/nodecardinality

$ tree
.
|-- pom.xml
|-- processor
|   |-- config
|   |   |-- pipeline-config.conf
|   |   `-- pipeline-job.conf
|   |-- pom.xml
|   `-- src
|       `-- main
|           |-- resources
|           |   |-- application.conf
|           |   `-- log4j.properties
|           `-- scala
|               `-- com
|                   `-- example
|                       `-- nodecardinality
|                           `-- processor
|                               |-- Compiler.scala
|                               |-- CompilerConfig.scala
|                               |-- IntermediateData.scala
|                               |-- LayersDef.scala
|                               `-- Main.scala
`-- schema
    |-- ds
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           `-- resources
    |               |-- ResourcesReadMe.txt
    |               `-- renderers
    |                   `-- ReadMe.txt
    |-- java
    |   `-- pom.xml
    |-- pom.xml
    |-- proto
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           |-- proto
    |           |   `-- com
    |           |       `-- example
    |           |           `-- nodecardinality
    |           |               `-- schema
    |           |                   `-- v0
    |           |                       `-- schema.proto
    |           `-- resources
    |               `-- description.md
    |-- scala
    |   `-- pom.xml
    `-- schema.yml

30 directories, 23 files

Schema Sub-project

The nodecardinality/schema folder contains the skeleton of a Maven project that builds Java and Scala libraries (usually referred to as bindings) to de/serialize partitions encoded as Protocol Buffers. We need this to encode partitions in the output node-cardinality layer as Protocol Buffers, and we want to specify a custom partition schema.

In the project's folder, there is:

  • the main POM file, pom.xml, used to compile the project
  • a java folder containing a POM file to build the Java bindings for the Protocol Buffers
  • a scala folder containing a POM file to build the Scala bindings for the Protocol Buffers
  • a ds folder containing a sub-project to bundle the resulting bindings and Protocol Buffer definitions in a ZIP file, that can be published to the Platform Artifactory repository to enable the decoding of partitions from the Portal
  • a proto folder containing the Protocol Buffer definitions. This is the sub-project we are going to customize to specify the output schema.

To create a custom Protocol Buffer schema, we have to add .proto files to the nodecardinality/schema/proto/src folder. For more information on Protocol Buffers, refer to the Protocol Buffers Documentation.

The skeleton project we have just created already contains a .proto file we can edit to quickly define the schema of the output partitions.

Open the nodecardinality/schema/proto/src/main/proto/com/example/nodecardinality/schema/v0/schema.proto file and search for main message definition:

syntax = "proto3";

package com.example.nodecardinality.schema.v0;

// Any dependent resources should be declared in the main pom, and the files referenced here:
//import "com/company/dependentGroupId/filename.proto";

// MainProtobufMessage is a placeholder, must match package/messagename in the mainMessage tag of the layer-manifest-plugin in the schema_ds module.
message MainProtobufMessage {
    int32 lat = 1;
    int32 lon = 2;
}

Let's change the name of the main message from MainProtobufMessage to NodeCardinalityPartition, remove the sample fields lat and lon, and add a repeated field named node_cardinality of type NodeCardinality.

Then, define an auxiliary message type NodeCardinality with two fields, the ID of the node (id) and the cardinality of the node (cardinality). This new Protocol Buffer definition looks like this:

syntax = "proto3";

package com.example.nodecardinality.schema.v0;

// Any dependent resources should be declared in the main pom, and the files referenced here:
//import "com/company/dependentGroupId/filename.proto";

message NodeCardinalityPartition {
  repeated NodeCardinality node_cardinality = 1;
}

message NodeCardinality {
  string id = 1;
  uint32 cardinality = 2;
}

Since we changed the name of the main message, we have to update the configuration used to build the Schema bundle. Open the POM file of the ds sub-project (nodecardinality/schema/ds/pom.xml) and locate the configuration for the layer-manifest-plugin:

      <!-- build the layer manifest file -->
      <plugin>
        <groupId>com.here.platform.schema.maven_plugins</groupId>
        <artifactId>layer-manifest-plugin</artifactId>
        <version>${here.plugin.version}</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>write-manifest</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <!-- Message class used to de/serialize data held in a DataStore layer.
               NOTE: If changed here, must correspond to a message in protocol buffers definitions. -->
          <mainMessage>com.example.nodecardinality.schema.v0.MainProtobufMessage</mainMessage>
          <inputDir>${project.build.directory}/proto</inputDir>
          <writeManifest>true</writeManifest>
        </configuration>
      </plugin>

Change the mainMessage path in the plugin configuration, replacing com.example.nodecardinality.schema.v0.MainProtobufMessage with com.example.nodecardinality.schema.v0.NodeCardinalityPartition:

      <!-- build the layer manifest file -->
      <plugin>
        <groupId>com.here.platform.schema.maven_plugins</groupId>
        <artifactId>layer-manifest-plugin</artifactId>
        <version>${here.plugin.version}</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>write-manifest</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <!-- Message class used to de/serialize data held in a DataStore layer.
               NOTE: If changed here, must correspond to a message in protocol buffers definitions. -->
          <mainMessage>com.example.nodecardinality.schema.v0.NodeCardinalityPartition</mainMessage>
          <inputDir>${project.build.directory}/proto</inputDir>
          <writeManifest>true</writeManifest>
        </configuration>
      </plugin>

We can now compile the Schema project by running mvn install from the nodecardinality/schema folder. Alternatively, we can also run this command from the top-level project.

Two libraries are built, schema_v0_java and schema_v0_scala_2.11, that provide APIs to create a NodeCardinalityPartition object, serialize it to a ByteArray, and deserialize it from a ByteArray.

We use schema_v0_scala_2.11 in our processing logic, to write the output partitions. To read the input partitions from the Road Topology & Geometry Layer, we use the corresponding Scala bindings, provided by com.here.schema.rib.topology-geometry_v2_scala instead.

In the next section, we add those dependencies to the processor's sub-project.

Processing Logic

The nodecardinality/processor folder contains the skeleton project of a batch processing pipeline. This project builds the final processing application.

In the project's folder we can find:

  • the main POM file pom.xml, used to compile the project;
  • a src folder containing the Scala source files implementing the logic;
  • a config folder containing configuration files that can be used to run the pipeline locally, outside the Pipeline API.

Pipeline Configuration

To perform a batch processing job, the Data Processing Library requires the following:

  1. HERE Resource Names (HRNs) for all the input catalogs and the output catalog.
  2. Versions for all the input catalogs for the batch job to process.

For pipelines in SCHEDULED state, this information is automatically provided by the Pipeline API via two configuration files in HOCON format:

  1. pipeline-config.conf, providing the HRN of the input catalogs and output catalog.
  2. pipeline-job.conf, providing the versions of the input catalogs to be processed.

You upload the first file to the platform pipeline once the pipeline is deployed; it never changes between compilations. The job configuration, on the other hand, is created on the fly by the Pipeline API, when a new job is deemed to be run. For example, a new version of an input catalog exists, and the output catalog needs to be updated.

During local development, when a batch pipeline is run locally without the Pipeline API, we must provide these configuration files ourselves, by setting two Java System Properties:

  1. pipeline-config.file, containing the path to the pipeline-config.conf file
  2. pipeline-job.file, containing the path to the pipeline-job.conf file

The nodecardinality/processor/config folder contains templates for both files, which we can edit and use for local development.

The pipeline configuration's template file (nodecardinality/processor/config/pipeline-config.conf) looks like this:


// This configuration file is provided to specify input and
// output catalogs for pipelines during local development.
// In addition, the CLI uses this file when uploading the pipeline
// to the platform to create the relevant configuration.
// For more information on the HERE platform, see the
// platform documentation at https://developer.here.com/olp/documentation.
//
// Do not include this file in the pipeline fat JAR you upload to the platform.
// When running your pipeline locally, use the command below.
//
// -Dpipeline-config.file=config/pipeline-config.conf

pipeline.config {

  output-catalog { hrn = "hrn:here:data:::myoutput" } // TODO: Specify the output catalog HRN.

  input-catalogs {
    // The following keys are symbolic IDs for catalogs
    // passed to the pipeline that the platform can use to
    // bind to and identify specific inputs. Use one line
    // for each input catalog in your project. Add and
    // delete lines as necessary.
    input-catalog-1 { hrn = "hrn:here:data:::myinput1" } // TODO: Specify the input catalog HRN.
    input-catalog-2 { hrn = "hrn:here:data:::myinput2" } // TODO: Specify the input catalog HRN.
  }
}

pipeline.config.output-catalog.hrn indicates the output catalog's HRN. To read the HRN of the catalog we just created, open the catalog in the Portal. If the catalog ID is batch-processing-quickstart, the corresponding HRN is hrn:here:data:::batch-processing-quickstart.

pipeline.config.input-catalogs contains the HRN of all input catalogs, indexed by arbitrary symbolic identifiers, used to identify a specific input catalog in the job configuration and in the processing logic. The template file contains two input catalogs, with identifiers input-catalog-1 and input-catalog-2 and their corresponding sample HRNs. In this project, we only have one input catalog, HERE Map Content, with the HRN hrn:here:data::olp-here:rib-2. Delete those two sample input catalogs, add one with the HRN mentioned above, and then choose rib as its catalog ID:


pipeline.config {

  output-catalog { hrn = "hrn:here:data:::batch-processing-quickstart" }

  input-catalogs {
    rib { hrn = "hrn:here:data::olp-here:rib-2" }
  }
}

The nodecardinality/processor/config/pipeline-job.conf file is a template for the job configuration. It contains the following:


// This configuration file is provided to facilitate local development
// of pipelines with the HERE Data SDK and
// for use with pipelines that are not scheduled to run
// in reaction to changes in input catalogs.
// For more information on the HERE platform, see the
// platform documentation at https://developer.here.com/olp/documentation.
//
// Do not include this file in the pipeline fat JAR you upload to the platform.
// When running your pipeline locally, use the command below.
//
// -Dpipeline-job.file=config/pipeline-job.conf

pipeline.job.catalog-versions {

  input-catalogs {

    // The following keys are symbolic IDs for catalogs
    // passed to the pipeline that the platform can use to
    // bind to and identify specific inputs.

    // Specify only one processing-type/version pair per input catalog.
    // Add an entry in the appropriate object (for example,
    // input-catalog-1, input catalog-2, input-catalog-3, ...)
    // for each input catalog in your project.
    // For more information on the processing type options, see
    // Build a Batch Pipeline with Maven Archetypes in the
      // Data Processing Library documentation.

    // Use case: Ignore results of previous compilation and fully process catalogs.
    // TODO: If you want to use incremental processing, comment out these sections.
    input-catalog-1 {
      processing-type = "reprocess"
      version = 0 // TODO: Specify the version of the catalog to be processed.
    }
    input-catalog-2 {
      processing-type = "reprocess"
      version = 2 // TODO: Specify the version of the catalog to be processed.
    }

    // Use case: Process catalogs incrementally. The processing
    // types below are not recommended for local development. They are
    // mostly used by platform pipelines to enable incremental compilation
    // with the Data Processing Library.
    // TODO: Comment out the section above, remove the comments below, and
    // specify the versions.
    //  input-catalog-1 {
    //    processing-type = "no_changes"
    //    version = 0 // TODO: Specify the version of the catalog to be processed.
    //  }
    //  input-catalog-2 {
    //    processing-type = "changes"
    //    since-version = 1 // TODO: Specify the correct starting version of the changeset to be processed.
    //    version = 4 // TODO: Specify the ending version of the changeset to be processed.
    //  }
  }
}

For each input catalog specified in pipeline-config.conf, the pipeline.job.catalog-versions.input-catalogs contains:

  • the version of the catalog to use for processing
  • the type of processing that the Data Processing Library uses for incremental compilation

The pipeline.job.catalog-versions.input-catalogs.input-catalog-ID.processing-type can have three different values to denote three types of processing:

  1. reprocess: this type indicates that the catalog should be fully processed, results from a previous compilation are not used to reduce the amount of data being processed. This is the simplest type of processing to use, when you are dealing with a manually written job configuration. It effectively disables incremental compilation, a feature of the Data Processing Library that allows you to reduce the amount of data processed using the results of a previous compilation. With this type of processing, you must provide the version of the catalog to process.
  2. no_changes: this type indicates that we want to reuse the same version of the catalog used when the output catalog was last compiled. This type of processing lets the Data Processing Library skip some compilation steps. You must provide the version of the catalog to process. A valid processing configuration requires the version to be equal to the version used in the last compilation. The Data Processing Library makes sure this condition holds true before the processing starts, otherwise incremental compilation is disabled.
  3. changes: this type indicates that we want to process a new version of the catalog (version) given the version processed in the last compilation (since-version). This type of processing may be used by the Data Processing Library to optimize processing, reducing the amount of data actually reprocessed. The processing configuration is valid as long the version of the catalog used in the last compilation is indeed since-version; otherwise incremental compilation is disabled.

It is important to understand that both no_changes and changes are only used to enable optimizations internally, in the Data Processing Library. Conceptually, for any input catalog, the processing library fully processes a given version, always. For this quick start, and for local development, we prefer to rely exclusively on the reprocess processing type. Once a pipeline is deployed, it is the Pipeline API duty to provide a valid job configuration that takes maximum advantage of the Data Processing Library's optimization capabilities.

At the time of writing, the latest version of the HERE Map Content available is 1.

Let's configure the rib catalog using reprocess for processing-type and 1 for version.

This is how pipeline-job.conf should look now:

pipeline.job.catalog-versions {

  input-catalogs {

    rib {
      processing-type = "reprocess"
      version = 1
    }
  }
}

Dependencies

The SDK Maven Archetypes provides all basic dependencies in the pom.xml file in the processor sub-project. You must manually add custom dependencies used by your processing logic here. For this project we need two more dependencies:

  • com.here.schema.rib.topology-geometry_v2_scala, to deserialize the input partitions
  • com.example.nodecardinality.schema_v0_scala_2.11, that we have just created, to serialize the output partitions

Open the nodecardinality/processor/pom.xml file. There is already a placeholder for the Java bindings created by the Schema project. To find it, search for DATA_MODEL_PROJECT_NAME in the file:

    <!--
      List data model artifacts that this compiler depends on,
      for input, for output or for both, as dependency here.
    -->
    <!--
      <dependency>
        <groupId>com.example.nodecardinality</groupId>
        <artifactId>{DATA_MODEL_PROJECT_NAME}_scala_2.11</artifactId>
        <version>{DATA_MODEL_PROJECT_VERSION}</version>
      </dependency>
    -->

Uncomment that dependency, then fill in the {DATA_MODEL_PROJECT_NAME} and {DATA_MODEL_PROJECT_VERSION} placeholders with schema_v0 and 1.0.0, respectively:

    <!--
      List data model artifacts that this compiler depends on,
      for input, for output or for both, as dependency here.
    -->
    <dependency>
      <groupId>com.example.nodecardinality</groupId>
      <artifactId>schema_v0_scala_2.11</artifactId>
      <version>1.0.0</version>
    </dependency>

Then, add a dependency on com.here.schema.rib.topology-geometry_v2_scala:

    <!--
      HERE Content Map Road Topology & Geometry layer Scala bindings
    -->
    <dependency>
      <groupId>com.here.schema.rib</groupId>
      <artifactId>topology-geometry_v2_scala</artifactId>
      <version>2.8.0</version>
    </dependency>

Processing Logic

We are now ready to implement the actual processing logic by editing the Scala source files that the Maven archetypes created. In the processor/src/main/scala/com/example/nodecardinality/processor folder we have five source files:

  1. Main.scala: contains the main entry point of the processing application, as a subclass of PipelineRunner. The Driver is configured with a single DriverTask containing one Direct1ToNCompiler (implemented in Compiler.scala).
  2. CompilerConfig.scala: contains the compiler configuration, a class we may define to configure our business logic through the application.conf configuration file. However, our business logic does not need to expose any configuration parameters, thus the default implementation is sufficient.
  3. IntermediateData.scala: defines the IntermediateData class used by the compiler defined in Compiler.scala.
  4. Compiler.scala: implements the actual processing logic as a Direct1ToNCompiler.
  5. LayersDef.scala: defines the input and output layers.

First, we need to decide which compilation pattern and intermediate data to use for the task at hand. This quick start focuses on functional patterns. Compared to the RDD-based patterns, we get incremental compilation with no intervention, and we do not have to deal with Spark caveats like partitioning, shuffling, or persistence.

The underlying Spark application is still interesting to better understand how the functional patterns work. All of the patterns implement different flavors of the following scheme:

  1. The input metadata is retrieved and an RDD of Key and Meta pairs is created. Key uniquely identifies a partition, and contains the catalog ID, the layer ID, and the partition name. Meta contains information about the payload of the partition, and can be used together with the corresponding Key to retrieve the content of a partition (payload).
  2. A CompileIn transformation is applied to the input RDD. The purpose of this step is to define the mapping between input and output partitions and to preprocess the input data into an intermediate representation that you define. In most compilation patterns, this step corresponds to a flatMap, where a compileInFn that returns a sequence of (Key, IntermediateData) pairs given a single (Key, Meta) pair is applied to all elements of the RDD. This is then followed by a groupBy transformation to group all intermediate representations with the same output key together.
  3. The resulting RDD of Key and Iterable<IntermediateData> pairs is then processed by applying a CompileOut transformation, where a Payload is produced for each Key from the grouped intermediate representations.

In this project, for each input partition of the HERE Map Content's topology-geometry layer, we want to create an output partition with the same Tile ID in the output catalog's node-cardinality layer. The mapping between input and output does not depend on the content of the input partitions; we just need the Tile ID that is part of the partition's Key. For this reason we can use a direct compiler.

We will implement a direct 1:1 compilation since each input partition is used to produce one output partition. This is a special case of 1:N compilation, and therefore we only need a Direct1ToNCompiler pattern.

We still have to decide what IntermediateData to use between CompileIn and CompileOut. Since we're performing a direct 1:1 compilation, we can implement the processing logic in CompileOut directly. That means we can forward the Key and Meta objects of the input partition from the CompileIn transformation to the CompileOut transformation and process the input data.

Notice that more complex IntermediateData classes containing a processed version of the input partition are usually necessary, especially when an input partition is used to compile multiple output partitions and we want to avoid processing the same data multiple times.

Since the default IntermediateData class provided by the Maven archetypes simply wraps the Key and Meta classes, it can be used in this case without any modification.

This is the IntermediateData class provided by the archetypes:

case class IntermediateData(attribute1: String, attribute2: String)

Let's rewrite the implementation to wrap a Key and Meta pair:

import com.here.platform.data.processing.compiler.{InKey, InMeta}

case class IntermediateData(key: InKey, meta: InMeta)

Leave that file unchanged and open the processor/src/main/scala/com/example/nodecardinality/processor/LayersDef.scala file. Here, we will replace the placeholders for the input and output layers:

  object In {

    val CatalogId = Catalog.Id("input-catalog-1")

    val LayerId = Layer.Id("input-layer-1")

  }

  object Out {

    val LayerId = Layer.Id("output-layer-1")

  }

For input-catalog-1 we have to use the symbolic ID configured in pipeline-config.conf (rib). The input-layer-1 is topology and the output-layer-1 is node-cardinality:

object In {

  val CatalogId = Catalog.Id("rib")

  val LayerId = Layer.Id("topology-geometry")

}

object Out {

  val LayerId = Layer.Id("node-cardinality")

}

Now, let's open the processor/src/main/scala/com/example/nodecardinality/processor/Compiler.scala file.

We need to import the Scala bindings for the HERE Map Content topology-geometry layer and for our output layer:

import com.example.nodecardinality.schema.v0.schema._
import com.here.schema.rib.v2.topology_geometry_partition._

In a direct compiler, the CompileIn function is split into:

  • a mappingFn function that returns a sequence of output Keys given an input Key
  • a compileInFn function that returns an IntermediateData object given an input Key and Meta

The mapping established by mappingFn is used to send the IntermediateData object to the corresponding output Key, which is then compiled in the compileOutFn function.

Search for the mapping function:

  override def mappingFn(inKey: InKey): Iterable[OutKey] = ???

Each input Key has to be mapped to an output Key with the same partition name, a catalog ID equal to the output catalog (Default.OutCatalogId()), and a layer ID equal to the output layer (OUT_LAYER).

First, import the Default object:

import com.here.platform.data.processing.driver.Default

Then implement mappingFn as follows:

override def mappingFn(inKey: InKey): Iterable[OutKey] =
  Iterable(OutKey(Default.OutCatalogId, Out.LayerId, inKey.partition))

In compileInFn we simply return an IntermediateData built out of the Key and Meta of the input partition.

Replace the existing compileInFn method with the following:

override def compileInFn(in: (InKey, InMeta)): IntermediateData =
  IntermediateData(in.key, in.meta)

Then replace compileOutFn with the following:

override def compileOutFn(outKey: OutKey, intermediate: IntermediateData): Option[Payload] = {
  val payload = retriever.getPayload(intermediate.key, intermediate.meta)
  val partition = TopologyGeometryPartition.parseFrom(payload.content)
  val outputPartition =
    NodeCardinalityPartition(
      partition.node.map(node => NodeCardinality(node.identifier, node.segmentRef.size)))
  Some(Payload(outputPartition.toByteArray))
}

Here, the return values of mappingFn and compileInFn from above are passed compileOutFn as the OutKey of the output partition and the IntermediateData containing the InKey and InMeta pair from the corresponding input partition.

We retrieve the payload of the input partition using the retriever object that's initialized when the Compiler object is constructed:

val payload = retriever.getPayload(intermediate.key, intermediate.meta)

Decode the corresponding topology partition using the Scala bindings of the HERE Map Content's topology-geometry layer:

val partition = TopologyGeometryPartition.parseFrom(payload.content)

Next we'll create the corresponding output partition using the Scala bindings from our Schema project. For each Protocol Buffer message, we construct an instance of the case class NodeCardinalityPartition. This has a single constructor argument representing the repeated field node_cardinality, as defined in the schema. The value of this argument is created by iterating over the list of nodes in the topology partition and constructing for each such node a NodeCardinality object with the node's ID (node.identifier) and its cardinality (node.segmentRef.size).

val outputPartition =
  NodeCardinalityPartition(
    partition.node.map(node => NodeCardinality(node.identifier, node.segmentRef.size)))

To publish the data, we serialize the output partition to a byte array (outputPartition.toByteArray), parse a Payload object from it, and return the optional payload. If you don't want a specific output partition in the output catalog, you can use None. But in this case, we want to publish an output partition for each input partition available:

Some(Payload(outputPartition.toByteArray))

Now we can build the entire project from the top-level folder:

$ pwd
~/projects/nodecardinality

$ mvn install

Run The Processor Locally

Processing a global catalog like the HERE Map Content can be time consuming. However, we can limit the number of partitions to process during local development by adding one or more partition filters to the application.conf file.

In this case we use a BoundingBoxFilter to process only the partitions inside a bounding box containing the city of Berlin. Open the processor/src/main/resources/application.conf file and append this partition filter configuration:


here.platform.data-processing.executors.partitionKeyFilters = [
  {
    className = "BoundingBoxFilter"
    param = {
      boundingBox {
        // Berlin
        north = 52.67551
        south = 52.338261
        east = 13.76116
        west = 13.08835
      }
    }
  }
]

Make sure to rerun mvn install after making this change. From the processor module folder, you can then run the compilation job using the configuration files set up above:

$ pwd
~/projects/nodecardinality/processor

$ mvn exec:java -Dexec.mainClass=com.example.nodecardinality.processor.Main \
                -Dpipeline-config.file=config/pipeline-config.conf          \
                -Dpipeline-job.file=config/pipeline-job.conf                \
                -Dexec.args="--master local[*]"

The Maven exec plugin is used with the main class set to com.example.nodecardinality.processor.Main and configuration files config/pipeline-config.conf and config/pipeline-job.conf. The PipelineRunner main method accepts an optional --masterspark-master command line argument to set the master URL for the cluster. Use local[*] to run Spark locally.

Inspect The Catalog

To inspect the catalog we created:

  1. Log into the HERE platform. Search for the catalog from the Data tab and select it to switch to the catalog view.
  2. Select the node-cardinality layer and select the Inspect tab.
  3. Set the zoom level to 8 and search the map for Berlin.
  4. The output partitions are highlighted on the map, covering the bounding box of Berlin we specified.

Decode Partitions

To decode the output partitions, we need to configure the layer to use our custom schema. First we have to upload our schema to the platform. From the nodecardinality/schema/ folder, run mvn deploy:

$ pwd
~/projects/nodecardinality/schema

$ mvn deploy

A schema with the same group ID, artifact ID, and version can be only deployed once. To avoid collisions, ensure that your group ID (com.example.nodecardinality in this example) is unique.

In the Data view of the portal, click Browse schemas to display a list of deployed schemas your user can access, and make sure our schema, schema_v0, is there.

From the node-cardinality layer view of the portal, click More and then select Reconfigure layer to access the layer configuration page. You have now the possibility to change most of the parameters you set during the creation of the layer, including the schema configuration. Locate the Schema configuration, select schema_v0 from the top down menu and then click Save at the bottom of the page.

From the Inspect tab, you can now select any partition. The decoded partition is displayed on the panel to the right.

Run The Processor as the Pipeline on the HERE Platform

Using the Pipeline API, you can run the batch pipeline we just created on a cluster. You can deploy your pipeline in one of two ways:

  • scheduled pipeline, that automatically runs every time there is a new version of an input catalog
  • run-once pipeline, that uses a pipeline-job.conf configuration you provide.

To use the Pipeline API, your App ID must belong to a Group. Your administrator must set up a Group ID in your account and assign your application to the group. Finally, you must grant that Group ID Read and Write access to the output catalog.

To deploy the processor to the platform pipeline we first need to package it into a fat JAR. The pom.xml file generated by the Archetypes contains a platform profile for this purpose:

$ pwd
~/projects/nodecardinality/processor

$ mvn -Pplatform package

The above command creates a fat JAR of the processor as processor/target/processor-1.0-SNAPSHOT-platform.jar.

You must use this file to create a Pipeline Template.

Refer to the Pipeline User Guide for detailed instructions on deploying and running pipelines from the HERE platform portal.

Additionally, you can deploy and run a pipeline with the OLP CLI. Refer to the OLP CLI User Guide for details.

results matching ""

    No results matching ""