This example demonstrates how to do the following:
Use the CLI to create a local output catalog to run your processing pipeline during development
Use the CLI to create a local copy of the input platform hosted catalog, to create a fully offline development environment
Use the Data Client Library, local catalogs, and your favorite testing framework to test your processing pipeline.
About local catalogs
A local catalog is a special catalog encoded in a file and stored in the ~/.here/local/ directory. The CLI and Data Client Library can spawn a local version of the Data API that reads and writes to local catalogs. Every local catalog file has the filename <catalog-id>.db, and can be renamed (to change the catalog ID), copied, and moved to other machines, to share local catalogs within or across engineering teams. Catalogs can be added to the ~/.here/local/ directory and be immediately used with the CLI and the Data Client Library.
Local catalogs are identified by the special HRN hrn:local:data:::<catalog-id>.
No authentication and no access to the external network are needed to use local catalogs, hence you do not incur any additional cost for accessing local catalogs. Since they are contained in a local machine, they are not subject to naming conflicts within your realm.
Additionally, local catalogs can be optionally created and stored in memory, rather than persisted on disk, which makes them especially useful during development and automated testing.
Note
Local catalogs may only be used for development and testing purposes. Running production use cases with local catalogs is not permitted.
A sample processing application
Local catalogs can be used during the development of a wide range of applications, from standalone to Spark or Flink based applications, and with versioned, stream, index, and volatile layers.
In this tutorial you use Data Client Library's Spark Connector, but the steps to run and test this application with local catalog can be used in all other scenarios.
First, develop an application that for each topology partitions in HERE Map Content, counts the number of topology nodes in each level 14 sub-tile, encodes this information in level 12 GeoJSON tiles, and saves the result in a output versioned layer:
Figure 1. Sample processing logic
You use different shades of red based on the number of nodes, to create a "heatmap" representing the density of intersections in the sub-tile:
Figure 2. Final output
Set up the Maven project
Create the following folder structure for the project:
<dependencies><dependency><groupId>com.here.platform.pipeline</groupId><artifactId>pipeline-interface_${scala.compat.version}</artifactId></dependency><dependency><groupId>com.here.platform.data.client</groupId><artifactId>spark-support_${scala.compat.version}</artifactId></dependency><!-- To enable support of local catalogs --><dependency><groupId>com.here.platform.data.client</groupId><artifactId>local-support_${scala.compat.version}</artifactId></dependency><dependency><groupId>com.here.hrn</groupId><artifactId>hrn_${scala.compat.version}</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.compat.version}</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.compat.version}</artifactId></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></dependency><dependency><groupId>com.here.schema.rib</groupId><artifactId>topology-geometry_v2_scala_${scala.compat.version}</artifactId></dependency><dependency><groupId>com.here.schema.rib</groupId><artifactId>topology-geometry_v2_java</artifactId></dependency><dependency><groupId>com.thesamet.scalapb</groupId><artifactId>sparksql-scalapb_${scala.compat.version}</artifactId><version>0.10.4</version></dependency><!-- To test the Scala version of the application --><dependency><groupId>org.scalatest</groupId><artifactId>scalatest_${scala.compat.version}</artifactId><version>3.0.1</version><scope>test</scope></dependency><!-- To test the Java version of the application --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies>
Create a local output catalog
First you develop your pipeline, reading from HERE Map Content and writing to a local catalog. Further on, you create a local copy of the HERE Map Content catalog to produce a completely offline development environment.
Creating a local catalog with the CLI is very similar to creating a catalog on the platform. You are using the following catalog configuration:
{"id":"node-count","name":"Aggregated node count at level 14 (From tutorial)","summary":"Here Map Content node count per level 14 tile.","description":"Here Map Content node count per level 14 tile.","tags":["Tutorial"],"layers":[{"id":"node-count","name":"node-count","summary":"Node count.","description":"Node count.","contentType":"application/vnd.geo+json","layerType":"versioned","volume":{"volumeType":"durable"},"partitioning":{"scheme":"heretile","tileLevels":[12]}}]}
The catalog contains a GeoJSON layer at level 12 (the same as in the input catalog). Create the catalog by running the following command:
olp local catalog create node-count "Node Count"\
--config local-development-workflow-output-catalog.json
Note
If a billing tag is required in your realm, update the config file by adding the billingTags: ["YOUR_BILLING_TAG"] property to the layer section.
The CLI returns as follows:
Catalog hrn:local:data:::node-count has been created.
This means you have now access to the local catalog hrn:local:data:::node-count, and you can play with the local variants of the CLI catalog commands:
olp local catalog list
olp local catalog show hrn:local:data:::node-count
olp local catalog layer show hrn:local:data:::node-count node-count
For more details on the available CLI commands for local catalogs, see the OLP CLI documentation.
The catalog data is stored in the ~/.here/local/node-count.db file, which you can verify with:
ls ~/.here/local/
You can now create a pipeline-config.conf file containing the input catalog (HERE Map Content) and your newly created local catalog is the output catalog:
pipeline.config {// Make sure the local catalog has been created first:// olp local catalog create node-count "Node Count" --config local-development-workflow-output-catalog.json
output-catalog { hrn = "hrn:local:data:::node-count"}
input-catalogs {
here-map-content { hrn = "hrn:here:data::olp-here:rib-2"}}}
Implement the application
The sample application reads and processes all input partitions within a bounding box, specified in the application.conf configuration file:
tutorial {
boundingBox {// Berlin
north = 52.67551
south = 52.33826
east = 13.76116
west = 13.08835}}
To implement the logic, use the Data Client Library Spark connector and typical Spark transformations. Notice that you do not need to fully understand the processing logic to proceed with the tutorial. At the end of this exercise, test this code as a black box.
Scala
Java
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.olp.util.geo.BoundingBox
importcom.here.olp.util.quad.factory.HereQuadFactory
importcom.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
importcom.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
importcom.here.platform.data.client.spark.scaladsl.{
GroupedData,
VersionedDataConverter,
VersionedRowMetadata
}importcom.here.platform.pipeline.PipelineContext
importcom.here.schema.rib.v2.topology_geometry_partition.TopologyGeometryPartition
importcom.typesafe.config.ConfigBeanFactory
importorg.apache.spark.sql.{Dataset, Row, SparkSession}importscalapb.spark.Implicits._
object SparkConnectorLocalScala {// We expose two public methods, `main` and `run`. `main` reads the pipeline context and the// configuration from `pipeline-config.conf` and `application.conf`, while `run` accepts them// as parameters. This makes it convenient to test the logic later using `run` with different// configuration parameters, without having to pass them through the classpath.def main(args: Array[String]):Unit={// Read the default pipeline contextval pipelineContext =new PipelineContext
// Read the bounding box configured in `application.conf`val bbox = ConfigBeanFactory.create(
pipelineContext.applicationConfig.getConfig("tutorial.boundingBox"),
classOf[BoundingBox])
run(pipelineContext, bbox)}def run(pipelineContext: PipelineContext, bbox: BoundingBox):Unit={// Defines the input / output catalogs to be read / written toval inputHrn = pipelineContext.config.inputCatalogs("here-map-content")val outputHrn = pipelineContext.config.outputCatalog
// Input / output layersval topologyGeometryLayer ="topology-geometry"val outputLayer ="node-count"val sparkSession: SparkSession =
SparkSession.builder().appName("SparkTopologyNodeCount").getOrCreate()// Read the input data as a Dataset of topology partitionsval topologyGeometryData: Dataset[TopologyGeometryPartition]= sparkSession
.readLayer(inputHrn, topologyGeometryLayer).query(
s"mt_partition=inboundingbox=(${bbox.getNorth},${bbox.getSouth},${bbox.getEast},${bbox.getWest})").option("olp.connector.force-raw-data", value =true).load().select("data").as[Array[Byte]].map(TopologyGeometryPartition.parseFrom)// This is needed for implicit conversions - moved down to avoid conflicts with scalapb.spark.Implicits._importsparkSession.implicits._
// Compute the output partitionsval nodeCountPartitions: Dataset[NodeCountPartition]= topologyGeometryData.map { partition =>val quads = partition.node
.flatMap(_.geometry).map(
g => HereQuadFactory.INSTANCE.getMapQuadByLocation(g.latitude, g.longitude,14).getLongKey
).groupBy(identity).map {case(tileId, seq)=> NodeCount(tileId, seq.size)}
NodeCountPartition(partition.partitionName, quads.toSeq)}// Encode the output partitions, convert to DataFrame and publish the output data
nodeCountPartitions
.map(p =>(p.partitionName, p.toGeoJson.getBytes)).toDF("mt_partition","data").writeLayer(outputHrn, outputLayer).withDataConverter(new VersionedDataConverter {def serializeGroup(rowMetadata: VersionedRowMetadata,
rows: Iterator[Row]): GroupedData[VersionedRowMetadata]=
GroupedData(rowMetadata, rows.next().getAs[Array[Byte]]("data"))}).save()
sparkSession.stop()}// Class representing the number of nodes in a level 14 sub-tilecaseclass NodeCount(tileId:Long, count:Int){def toGeoJson:String={val box = HereQuadFactory.INSTANCE.getMapQuadByLongKey(tileId).getBoundingBox
val e = box.getEast
val w = box.getWest
val n = box.getNorth
val s = box.getSouth
val color ={// from black (0 nodes) to red (500 nodes)
s"rgb(${(count.min(500).toDouble / 500 * 255).toInt},0,0)"}"""{"type":"Feature","""+
s""""geometry":{"type":"Polygon","coordinates":[[[$w,$s],[$e,$s],[$e,$n],[$w,$n],[$w,$s]]]},"""+
s""""properties":{"description":{"tileId":$tileId,"count":$count},"style":{"color":"$color"}}}"""}}// Class representing the decoded content of an output partitioncaseclass NodeCountPartition(partitionName:String, counts: Seq[NodeCount]){def toGeoJson:String=
s"""{"type":"FeatureCollection","features":[${counts.map(_.toGeoJson).mkString(",")}]}"""}}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN;importcom.here.olp.util.geo.BoundingBox;importcom.here.olp.util.quad.factory.HereQuadFactory;importcom.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;importcom.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;importcom.here.platform.data.client.spark.javadsl.VersionedDataConverter;importcom.here.platform.data.client.spark.scaladsl.GroupedData;importcom.here.platform.data.client.spark.scaladsl.VersionedRowMetadata;importcom.here.platform.pipeline.PipelineContext;importcom.here.schema.rib.v2.TopologyGeometry.Node;importcom.here.schema.rib.v2.TopologyGeometryPartitionOuterClass;importcom.here.schema.rib.v2.TopologyGeometryPartitionOuterClass.TopologyGeometryPartition;importcom.typesafe.config.ConfigBeanFactory;importjava.io.Serializable;importjava.util.Iterator;importjava.util.List;importjava.util.stream.Collectors;importorg.apache.spark.api.java.function.MapFunction;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Encoders;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;publicclassSparkConnectorLocal{// Class representing the number of nodes in a level 14 sub-tilepublicstaticclassNodeCount{privatelong tileId;privateint count;publicNodeCount(long tileId,int count){this.tileId = tileId;this.count = count;}publicStringtoGeoJson(){BoundingBox box =HereQuadFactory.INSTANCE.getMapQuadByLongKey(tileId).getBoundingBox();double e = box.getEast();double w = box.getWest();double n = box.getNorth();double s = box.getSouth();// from black (0 nodes) to red (500 nodes)String color =String.format("rgb(%d,0,0)",(int)(Math.min(count,500)/500.0*255));returnString.format("{\"type\":\"Feature\","+"\"geometry\":{\"type\":\"Polygon\",\"coordinates\":[[[%f, %f], [%f, %f], [%f, %f], [%f, %f], [%f, %f]]]},"+"\"properties\":{\"description\":{\"tileId\":%d,\"count\":%d},\"style\":{\"color\":\"%s\"}}}",
w, s, e, s, e, n, w, n, w, s, tileId, count, color);}}// Class representing the decoded content of an output partitionpublicstaticclassNodeCountPartition{privateString partitionName;privateList<NodeCount> counts;publicNodeCountPartition(String partitionName,List<NodeCount> counts){this.partitionName = partitionName;this.counts = counts;}publicStringtoGeoJson(){returnString.format("{\"type\":\"FeatureCollection\",\"features\":[%s]}",
counts.stream().map(NodeCount::toGeoJson).collect(Collectors.joining(",")));}publicStringgetPartitionName(){return partitionName;}}// Class representing an encoded output partitionpublicstaticclassOutputDataimplementsSerializable{privateString mt_partition;privatebyte[] data;publicOutputData(String mt_partition,byte[] data){this.mt_partition = mt_partition;this.data = data;}// Minimal bean interface - required to use Encoders.beanpublicStringgetMt_partition(){return mt_partition;}publicvoidsetMt_partition(String mt_partition){this.mt_partition = mt_partition;}publicbyte[]getData(){return data;}publicvoidsetData(byte[] data){this.data = data;}}// We expose two static methods, `main` and `run`. `main` reads the pipeline context and the// configuration from `pipeline-config.conf` and `application.conf`, while `run` accepts them// as parameters. This makes it convenient to test the logic later using `run` with different// configuration parameters, without having to pass them through the classpath.publicstaticvoidmain(String[] args){// Read the default pipeline contextPipelineContext pipelineContext =newPipelineContext();// Read the bounding box configured in `application.conf`BoundingBox boundingBox =ConfigBeanFactory.create(
pipelineContext.applicationConfig().getConfig("tutorial.boundingBox"),BoundingBox.class);run(pipelineContext, boundingBox);}publicstaticvoidrun(PipelineContext pipelineContext,BoundingBox bbox){// Defines the input / output catalogs to be read / written toHRN inputHrn = pipelineContext.getConfig().getInputCatalogs().get("here-map-content");HRN outputHrn = pipelineContext.getConfig().getOutputCatalog();// Input / output layersString topologyGeometryLayer ="topology-geometry";String outputLayer ="node-count";SparkSession sparkSession =SparkSession.builder().appName("SparkTopologyNodeCount").getOrCreate();// Read the input data as a Dataset of topology partitionsDataset<TopologyGeometryPartitionOuterClass.TopologyGeometryPartition> topologyGeometryData =JavaLayerDataFrameReader.create(sparkSession).readLayer(inputHrn, topologyGeometryLayer).query(String.format("mt_partition=inboundingbox=(%f,%f,%f,%f)",
bbox.getNorth(), bbox.getSouth(), bbox.getEast(), bbox.getWest())).option("olp.connector.force-raw-data",true).load().select("data").as(Encoders.BINARY()).map((MapFunction<byte[],TopologyGeometryPartition>)TopologyGeometryPartition::parseFrom,Encoders.kryo(TopologyGeometryPartitionOuterClass.TopologyGeometryPartition.class));// Compute the output partitionsDataset<NodeCountPartition> nodeCountPartitions =
topologyGeometryData.map((MapFunction<TopologyGeometryPartition,NodeCountPartition>)
partition ->{List<NodeCount> counts =
partition
.getNodeList().stream().map(Node::getGeometry).map(
g ->HereQuadFactory.INSTANCE
.getMapQuadByLocation(g.getLatitude(), g.getLongitude(),14).getLongKey()).collect(Collectors.groupingBy(t -> t)).entrySet().stream().map(e ->newNodeCount(e.getKey(), e.getValue().size())).collect(Collectors.toList());returnnewNodeCountPartition(partition.getPartitionName(), counts);},Encoders.kryo(NodeCountPartition.class));// Encode the output partitions, convert to DataFrame and publish the output dataJavaLayerDataFrameWriter.create(
nodeCountPartitions
.map((MapFunction<NodeCountPartition,OutputData>)
p ->newOutputData(p.getPartitionName(), p.toGeoJson().getBytes()),Encoders.bean(OutputData.class)).toDF()).writeLayer(outputHrn, outputLayer).withDataConverter(newVersionedDataConverter(){@OverridepublicGroupedData<VersionedRowMetadata>serializeGroup(VersionedRowMetadata rowMetadata,Iterator<Row> rows){returnnewGroupedData<>(rowMetadata, rows.next().getAs("data"));}}).save();
sparkSession.stop();}}
Compile and run locally
To run the application locally, execute the following command:
After one run, you can inspect the local output layer with the CLI and the included local Data Inspector:
olp local catalog layer inspect hrn:local:data:::node-count node-count
Create an offline development environment
So far, you have only used an input catalog hosted on the platform. To create a development environment which does not require access to the external network nor HERE credentials, you now create a local copy of the HERE Map Content catalog, by running the following command:
This creates a local catalog hrn:local:data:::here-map-content. Copy the latest version of of the HERE Map Content catalog, and initialize and copy only the topology-geometry layer. Include only the partitions contained in the specified bounding box. Local catalog copies are special local catalogs that retain information about the source catalog, which allows the local copy to be updated by running:
olp local catalog copy update hrn:local:data:::here-map-content
You can now modify the pipeline configuration to use this local copy as input:
pipeline.config {
output-catalog {hrn = "hrn:local:data:::node-count"}
input-catalogs {// Notice how we use a local catalog here - instead of the HERE Map Content catalog on the// platform.
here-map-content {hrn = "hrn:local:data:::here-map-content"}}}
Optionally, you do not need to access any catalog on the platform, you can configure the Data Client Library to only use local catalogs, by setting here.platform.data-client.endpoint-locator.discovery-service-env=local. This prevents the library from trying to fetch an authentication token:
Now you can develop and run your application with no internet connection, for example, while commuting to work.
Note
In this tutorial, you have used the query functionality of the Data Client Library to only process a small bounding box, therefore run times are fast regardless of whether you read from the real platform hosted catalog or from a local copy. For applications which process the entire input, it is recommended that you copy a clip of the input catalog first, to keep development cycles fast, and test your logic with the real data in a Pipeline after ensuring the logic works correctly with local data.
Test the sample application
Local catalogs make it convenient to test the overall business logic of a processing application. Unit testing single methods is an important but time consuming process, and testing the whole pipeline as a black box is just as important. However, this usually requires creating and accessing catalog resources on the platform, which might be too slow and costly for frequently run automated test suites. Each developer, or even each test run, needs to access a unique output catalog, which further complicates the setup. Local in-memory catalogs are ideal in this situation, as they are kept in memory and are private to a single process instance.
Note
The maximum size of in-memory catalogs is limited by the amount of memory available to the JVM. If you need to run performance or stress tests with large local catalogs, you may decide to use persisted catalogs (here.platform.data-local.memory-mode=false).
First create your test configuration, by creating the file src/test/resources/application.conf. Configure the Data Client Library to use the local environment by default, which is needed to create catalogs in that environment with the AdminApi, and enable in-memory local catalogs:
here.platform.data-client {
# force local environment for all catalog accesses
endpoint-locator.discovery-service-env = local
}
here.platform.data-local {
# use in-memory local catalogs
memory-mode = true}
Run a single test to verify that the application correctly computes the number of nodes in each level 14 tile. To this end, you first create some pseudo-random input data starting from an expected output, encode it in topology partitions, run the pipeline, and verify that the generated data matches the expected output. This setup can be used to run multiple tests, and with different data, allowing to cover all edge-cases:
Scala
Java
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN
importcom.here.olp.util.geo.BoundingBox
importcom.here.olp.util.quad.factory.HereQuadFactory
importcom.here.platform.data.client.engine.scaladsl.DataEngine
importcom.here.platform.data.client.model._
importcom.here.platform.data.client.scaladsl.{DataClient, NewPartition}importcom.here.platform.data.client.spark.DataClientSparkContextUtils.context._
importcom.here.platform.data.client.spark.SparkSupport._
importcom.here.platform.pipeline.{PipelineConfig, PipelineContext}importcom.here.schema.geometry.v2.geometry.Point
importcom.here.schema.rib.v2.topology_geometry.Node
importcom.here.schema.rib.v2.topology_geometry_partition.TopologyGeometryPartition
importorg.junit.runner.RunWith
importorg.scalatest.junit.JUnitRunner
importorg.scalatest.{FlatSpec, Inspectors, Matchers}importjava.util.UUID
importscala.collection.JavaConverters._
importscala.util.Random
@RunWith(classOf[JUnitRunner])class SparkConnectorLocalScalaTest extends FlatSpec with Matchers with Inspectors {// Specify the bounding box used by our pipeline under test.privateval testBoundingBox =new BoundingBox(52.67551,52.33826,13.76116,13.08835)// Utilities to create a random topology node within a bounding box.privateval random =new Random(42)privatedef randomInRange(min:Double, max:Double):Double=
min +(max - min)* random.nextDouble()privatedef randomNode(boundingBox: BoundingBox): Node =
Node(
geometry = Some(
Point(
latitude = randomInRange(boundingBox.getSouth, boundingBox.getNorth),
longitude = randomInRange(boundingBox.getWest, boundingBox.getEast))))// Utility to create a catalog with a single HERE Tile versioned layer. The catalog will be local// because we use `here.platform.data-client.endpoint-locator.discovery-service-env=local` in// src/test/resources/application.conf. It will be kept in memory only, because we also use// `here.platform.data-local.memory-mode=true`.privatedef createCatalog(catalogIdPrefix:String, layerId:String): HRN =
DataClient().adminApi().createCatalog(
WritableCatalogConfiguration(
catalogIdPrefix +"-"+ UUID.randomUUID().toString,
catalogIdPrefix,"summary","description",
Seq(
WritableLayer(
layerId,
layerId,"summary","description",
LayerTypes.Versioned,
HereTilePartitioning(List(12)),
DurableVolume,
contentType ="application/octet-stream")))).awaitResult()// Utility to publish some test data in a catalog.privatedef writeData(hrn: HRN, layer:String, data: Map[String, Array[Byte]]):Unit={val partitions = data.map {case(partitionId, data)=>
NewPartition(
partitionId,
layer,
NewPartition.ByteArrayData(data))}
DataEngine().writeEngine(hrn).publishBatch2(4, Some(Seq(layer)), Nil, partitions.iterator).awaitResult()}// Utility to read the data from the latest version of a catalog.privatedef readData(hrn: HRN, layer:String): Map[String, Array[Byte]]={val readEngine = DataEngine().readEngine(hrn)val queryApi = DataClient().queryApi(hrn)
queryApi
.getLatestVersion().flatMap {case None => fail("Catalog is empty")case Some(v)=> queryApi.getPartitionsAsIterator(v, layer)}.awaitResult().map(p => p.partition -> readEngine.getDataAsBytes(p).awaitResult()).toMap
}// Utility to create an input test catalog given the expected outcome. `expectedCounts` is a map// containing, for each level 14 tile ID, the number of nodes contained in that sub-tile.// This method creates the requested number of topology nodes in each level-14 tile, encodes them// in a topology partition and publishes them in a freshly created test catalog.privatedef createTestInputCatalog(expectedCounts: Map[Long,Int]): HRN ={val hrn = createCatalog("input","topology-geometry")val partitions: Map[String, Array[Byte]]= expectedCounts
.map {case(tileId, count)=>val boundingBox = HereQuadFactory.INSTANCE.getMapQuadByLongKey(tileId).getBoundingBox
val nodes =(1L to count).map(_ => randomNode(boundingBox))
tileId -> nodes
}.groupBy {case(tileId, _)=>
HereQuadFactory.INSTANCE.getMapQuadByLongKey(tileId).getAncestor(12).getLongKey
}.map {case(tileId, nodes)=>
tileId.toString -> TopologyGeometryPartition(
partitionName = tileId.toString,
node = nodes.values.reduce(_ ++ _)).toByteArray
}
writeData(hrn,"topology-geometry", partitions)
hrn
}// Utility to create an output catalog for the pipeline.privatedef createTestOutputCatalog(): HRN = createCatalog("output","node-count")// Utility to retrieve the actual node count per level 14 tile in the output catalog. It reads the// latest version of the catalog, retrieves the data, decodes it and returns the node count// encoded in each level 14 tile. This is used, after a run of the pipeline under test, to verify// that the node count is the expected on.privatedef readActualNodeCount(hrn: HRN): Map[Long,Int]={// We use regular expressions to extract the tileId of the level 14 tiles and their node count.// Using a proper JSON parser is left as an exercise.val descriptionRegex =""""description":\s*\{"tileId":\s*(\d+),\s*"count":\s*(\d+)}""".r.unanchored
readData(hrn,"node-count").flatMap {case(_, data)=>val geoJson =newString(data)
descriptionRegex.findAllMatchIn(geoJson).map(m => m.group(1).toLong -> m.group(2).toInt)}}// Randomly generated expected result. For each level 14 tile in `testBoundingBox` we generate a// random non-negative node count. We filter out zeros because those are by design not encoded by// the pipeline under test.privateval expectedNodeCount: Map[Long,Int]={
HereQuadFactory.INSTANCE
.iterableBoundingBoxToMapQuad(testBoundingBox,14).asScala
.map { quad =>
quad.getLongKey ->(random.nextInt.abs %100)}.filter(_._2 !=0).toMap
}"SparkConnectorLocalScala" should "correctly compute the number of nodes in each sub-tile" in {// Create the input catalog and publish the test data.val inputHrn = createTestInputCatalog(expectedNodeCount)// Create an empty output catalog.val outputHrn = createTestOutputCatalog()val pipelineContext =
PipelineContext(PipelineConfig(outputHrn, Map("here-map-content"-> inputHrn)))// Run the pipeline under test.
SparkConnectorLocalScala.run(pipelineContext, testBoundingBox)// Check the output data.
readActualNodeCount(outputHrn) should contain theSameElementsAs expectedNodeCount
}}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importstaticjunit.framework.TestCase.assertEquals;importcom.here.hrn.HRN;importcom.here.olp.util.geo.BoundingBox;importcom.here.olp.util.quad.factory.HereQuadFactory;importcom.here.platform.data.client.engine.javadsl.DataEngine;importcom.here.platform.data.client.engine.javadsl.ReadEngine;importcom.here.platform.data.client.javadsl.DataClient;importcom.here.platform.data.client.javadsl.NewPartition;importcom.here.platform.data.client.javadsl.Partition;importcom.here.platform.data.client.javadsl.QueryApi;importcom.here.platform.data.client.model.*;importcom.here.platform.data.client.spark.DataClientSparkContextUtils;importcom.here.platform.pipeline.PipelineConfig;importcom.here.platform.pipeline.PipelineContext;importcom.here.schema.geometry.v2.GeometryOuterClass;importcom.here.schema.rib.v2.TopologyGeometry;importcom.here.schema.rib.v2.TopologyGeometryPartitionOuterClass;importjava.util.*;importjava.util.concurrent.CompletionStage;importjava.util.regex.Matcher;importjava.util.regex.Pattern;importjava.util.stream.Collectors;importjava.util.stream.IntStream;importjava.util.stream.StreamSupport;importorg.junit.Test;publicclassSparkConnectorLocalTest{// Simple Pair class used to collect maps with Java streams.staticclassPair<K,V>{privateK key;privateV value;publicPair(K key,V value){this.key = key;this.value = value;}publicKgetKey(){return key;}publicVgetValue(){return value;}}// Specify the bounding box used by our pipeline under test.privateBoundingBox testBoundingBox =newBoundingBox(52.67551,52.33826,13.76116,13.08835);// Utilities to create a random topology node within a bounding box.privateRandom random =newRandom(42);privatedoublerandomInRange(double min,double max){return min +(max - min)* random.nextDouble();}privateTopologyGeometry.NoderandomNode(BoundingBox boundingBox){returnTopologyGeometry.Node.newBuilder().setGeometry(GeometryOuterClass.Point.newBuilder().setLatitude(randomInRange(boundingBox.getSouth(), boundingBox.getNorth())).setLongitude(randomInRange(boundingBox.getWest(), boundingBox.getEast())).build()).build();}// Utility to create a catalog with a single HERE Tile versioned layer. The catalog will be local// because we use `here.platform.data-client.endpoint-locator.discovery-service-env=local` in// src/test/resources/application.conf. It will be kept in memory only, because we also use// `here.platform.data-local.memory-mode=true`.privateHRNcreateCatalog(String catalogIdPrefix,String layerId){returnawait(DataClient.get(DataClientSparkContextUtils.context().actorSystem()).adminApi().createCatalog(newWritableCatalogConfiguration.Builder().withId(catalogIdPrefix +"-"+ UUID.randomUUID().toString()).withName("name").withSummary("summary").withDescription("description").withLayers(Collections.singletonList(newWritableLayer.Builder().withId(layerId).withName("name").withSummary("summary").withDescription("description").withLayerType(LayerTypes.Versioned()).withPartitioning(Partitioning.HereTile().withTileLevels(Collections.singletonList(12)).build()).withVolume(Volumes.Durable()).withContentType("application/octet-stream"))).build()));}// Utility to publish some test data in a catalog.privatevoidwriteData(HRN hrn,String layer,Map<String,byte[]> data){Iterator<NewPartition> partitions =
data.entrySet().stream().map(
e ->newNewPartition.Builder().withPartition(e.getKey()).withLayer(layer).withData(e.getValue()).build()).collect(Collectors.toList()).iterator();await(DataEngine.get(DataClientSparkContextUtils.context().actorSystem()).writeEngine(hrn).publishBatch2(4,Optional.of(Collections.singletonList(layer)),Collections.emptyList(),
partitions));}// Utility to read the data from the latest version of a catalog.privateMap<String,byte[]>readData(HRN hrn,String layer){ReadEngine readEngine =DataEngine.get(DataClientSparkContextUtils.context().actorSystem()).readEngine(hrn);QueryApi queryApi =DataClient.get(DataClientSparkContextUtils.context().actorSystem()).queryApi(hrn);Iterable<Partition> partitions =()->await(
queryApi
.getLatestVersion(OptionalLong.empty()).thenApply(
v -> v.orElseThrow(()->newRuntimeException("Output catalog is empty"))).thenCompose(
v -> queryApi.getPartitionsAsIterator(v, layer,Collections.emptySet())));returnStreamSupport.stream(partitions.spliterator(),false).map(p ->newPair<>(p.getPartition(),await(readEngine.getDataAsBytes(p)))).collect(Collectors.toMap(Pair::getKey,Pair::getValue));}// Utility to create an input test catalog given the expected outcome. `expectedCounts` is a map// containing, for each level 14 tile ID, the number of nodes contained in that sub-tile.// This method creates the requested number of topology nodes in each level-14 tile, encodes them// in a topology partition and publishes them in a freshly created test catalog.privateHRNcreateInputCatalog(Map<Long,Integer> expectedNodeCount){HRN hrn =createCatalog("input","topology-geometry");Map<String,byte[]> partitions =
expectedNodeCount
.entrySet().stream().map(
e ->{BoundingBox boundingBox =HereQuadFactory.INSTANCE.getMapQuadByLongKey(e.getKey()).getBoundingBox();List<TopologyGeometry.Node> nodes =IntStream.rangeClosed(1, e.getValue()).mapToObj(notused ->randomNode(boundingBox)).collect(Collectors.toList());returnnewPair<>(e.getKey(), nodes);}).collect(Collectors.groupingBy(
p ->HereQuadFactory.INSTANCE
.getMapQuadByLongKey(p.getKey()).getAncestor(12).getLongKey())).entrySet().stream().map(
e ->{byte[] data =TopologyGeometryPartitionOuterClass.TopologyGeometryPartition.newBuilder().setPartitionName(String.valueOf(e.getKey())).addAllNode(
e.getValue().stream().flatMap(p -> p.getValue().stream()).collect(Collectors.toList())).build().toByteArray();returnnewPair<>(String.valueOf(e.getKey()), data);}).collect(Collectors.toMap(Pair::getKey,Pair::getValue));writeData(hrn,"topology-geometry", partitions);return hrn;}// Utility to create an output catalog for the pipeline.privateHRNcreateOutputCatalog(){returncreateCatalog("output","node-count");}private<T>Tawait(CompletionStage<T> stage){try{return stage.toCompletableFuture().get();}catch(Exception e){thrownewRuntimeException(e);}}// Utility to retrieve the actual node count per level 14 tile in the output catalog. It reads the// latest version of the catalog, retrieves the data, decodes it and returns the node count// encoded in each level 14 tile. This is used, after a run of the pipeline under test, to verify// that the node count is the expected on.privateMap<Long,Integer>readActualNodeCount(HRN hrn){// We use regular expressions to extract the tileId of the level 14 tiles and their node count.// Using a proper JSON parser is left as an exercise.Pattern descriptionRegex =Pattern.compile("\"description\":\\s*\\{\"tileId\":\\s*(\\d+),\\s*\"count\":\\s*(\\d+)}");returnreadData(hrn,"node-count").entrySet().stream().flatMap(
e ->{String geoJson =newString(e.getValue());Matcher matcher = descriptionRegex.matcher(geoJson);List<Pair<Long,Integer>> matches =newArrayList<>();while(matcher.find()){
matches.add(newPair<>(Long.parseLong(matcher.group(1)),Integer.parseInt(matcher.group(2))));}return matches.stream();}).collect(Collectors.toMap(Pair::getKey,Pair::getValue));}privateMap<Long,Integer>initializeExpectedNodeCount(){returnStreamSupport.stream(HereQuadFactory.INSTANCE
.iterableBoundingBoxToMapQuad(testBoundingBox,14).spliterator(),false).map(quad ->newPair<>(quad.getLongKey(),Math.abs(random.nextInt())%100)).filter(pair -> pair.getValue()!=0).collect(Collectors.toMap(Pair::getKey,Pair::getValue));}// Randomly generated expected result. For each level 14 tile in `testBoundingBox` we generate a// random non-negative node count. We filter out zeros because those are by design not encoded by// the pipeline under test.privateMap<Long,Integer> expectedNodeCount =initializeExpectedNodeCount();@TestpublicvoidcorrectNodeCountTest(){// Create the input catalog and publish the test data.HRN inputHrn =createInputCatalog(expectedNodeCount);// Create an empty output catalog.HRN outputHrn =createOutputCatalog();PipelineContext pipelineContext =newPipelineContext(newPipelineConfig(outputHrn,Collections.singletonMap("here-map-content", inputHrn)),Optional.empty());// Run the pipeline under test.SparkConnectorLocal.run(pipelineContext, testBoundingBox);// Check the output data.assertEquals(readActualNodeCount(outputHrn), expectedNodeCount);}}
To run the test with Maven, execute this command:
mvn test -Dspark.master=local[*]
Deploy on the platform
Once you are satisfied with your application, you can build a fat JAR file as shown in the following:
mvn -Pplatform clean package
Deploy it via the Pipeline API, with no code modifications. For more details on pipeline deployment, see the Pipelines Developer's Guide.
Further information
For additional details on the topics covered in this tutorial, you can refer to the following sources:
For information on the different layer types and configurations you can check the Data API documentation
To know more about the interactive querying and manipulation of catalogs, you should check the OLP CLI documentation and the Data section