Path Matching the Sensor Data to GeoJSON in Spark

Objectives: Use the Location Libraries to path match sensor data in standalone Spark.

Complexity: Intermediate

Time to complete: 45 min

Dependencies: Organize your work in projects

Source code: Download

This example demonstrates how to do the following:

  • Extract the recorded paths stored in the index layer of the SDII Sensor Data Sample Catalog,
  • Map match each one of them using the Map Matching algorithms of the Location Library
  • Output the map matched paths (trips) into a GeoJSON layer

The input catalog contains path coordinates recorded from GPS devices:

Input
Figure 1. Input

If map matching is possible, the output catalog contains the same trips that are matched to the HERE road network:

Output
Figure 2. Output

Set up the Maven Project

Create the following folder structure for the project:

batch-path-matching
└── src
    └── main
        ├── java
        └── scala

You can do this with a single bash command:

mkdir -p batch-path-matching/src/main/{java,scala}

The Maven POM file is similar to that in the Verify Maven Settings example, with the parent POM and dependencies sections updated:

Parent POM:

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom</artifactId>
    <version>2.19.10</version>
    <relativePath/>
</parent>

Dependencies:

<dependencies>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.location</groupId>
        <artifactId>location-data-loader-spark_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.location</groupId>
        <artifactId>location-integration-optimized-map_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>spark-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.hrn</groupId>
        <artifactId>hrn_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
</dependencies>

Create the Output Catalog

Create an output catalog to contain the matched trips using this config file, replacing {{YOUR_CATALOG_ID}} and {{YOUR_USERNAME}} with your own.

{
  "id": "{{YOUR_CATALOG_ID}}",
  "name": "{{YOUR_USERNAME}} Path Matching Tutorial",
  "summary": "Berlin sample matched path in GeoJSON",
  "description": "Berlin sample matched path in GeoJSON",
  "layers": [
    {
      "id": "matched-trips",
      "name": "GeoJSON matched trips",
      "summary": "GeoJSON matched trips",
      "description": "GeoJSON matched trips",
      "contentType": "application/vnd.geo+json",
      "layerType": "versioned",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "heretile",
        "tileLevels": [14]
      },
      "coverage": {
        "adminAreas": [
          "DE"
        ]
      }

    }
  ]
}

Using {{YOUR_PROJECT_HRN}} from Organize your work in projects tutorial, create a catalog with a GeoJSON layer at level 14 (the same as in the input catalog):

olp catalog create {{YOUR_CATALOG_ID}} "{{YOUR_USERNAME}} Matched Paths" \
    --config output-catalog.json --scope {{YOUR_PROJECT_HRN}}

The CLI returns as follows:

Catalog {{YOUR_CATALOG_HRN}} has been created.

Implement the Path Matching Application

The main() body contains direct invocations of the Location Libraries for the path matching logic. The helper methods implement the marshalling of input and output data via the Data Client Library.

Scala
Java

/*
 * Copyright (c) 2018-2020 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import akka.actor.ActorSystem
import com.here.hrn.HRN
import com.here.platform.data.client.engine.scaladsl.DataEngine
import com.here.platform.data.client.model.VersionDependency
import com.here.platform.data.client.scaladsl.{CommitPartition, DataClient, NewPartition}
import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.data.client.spark.LayerDataFrameReader._
import com.here.platform.data.client.spark.SparkSupport._
import com.here.platform.location.core.geospatial.GeoCoordinate
import com.here.platform.location.core.mapmatching.OnRoad
import com.here.platform.location.dataloader.core.caching.CacheManager
import com.here.platform.location.dataloader.spark.SparkCatalogFactory
import com.here.platform.location.integration.optimizedmap.mapmatching.PathMatchers
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{udf, _}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

// Read sensor data from an SDII archive and output the map-matched paths
// to a GeoJSON output catalog with the same tiling scheme.
object BatchPathMatchingScala {

  private val pipelineContext = new PipelineContext

  private val outputCatalogHrn = pipelineContext.config.outputCatalog
  private val outputLayer = "matched-trips"

  private val sensorDataArchiveHrn = pipelineContext.config.inputCatalogs("sensor")
  private val sensorDataArchiveVersionLayerName = "sample-index-layer"

  private val locationCatalogHrn = pipelineContext.config.inputCatalogs("location")
  private val locationCatalogVersion = pipelineContext.job.get.inputCatalogs("location").version

  val sparkSession: SparkSession =
    SparkSession.builder().appName("BatchPathMatchingPipeline").getOrCreate()

  import sparkSession.implicits._

  def main(args: Array[String]): Unit = {

    val sensorData: Dataset[SensorData] = getSensorData

    // Set up the location library catalog factory and cache used by the path matcher.
    val locationLibraryCatalogFactory = new SparkCatalogFactory
    val locationLibraryCatalog =
      locationLibraryCatalogFactory.create(locationCatalogHrn, locationCatalogVersion)
    val locationLibraryCache = CacheManager.withLruCache

    // Map match each path inside each sensor archive tile (https://en.wikipedia.org/wiki/Map_matching)
    // and return a list of lists of coordinates.
    // Each element in the top-level list represents a path as a list of coordinates.
    val matchedPaths: Dataset[(Long, Seq[Seq[GeoCoordinate]])] =
      sensorData.mapPartitions { sensorDataIterator =>
        val pathMatcher =
          PathMatchers.carPathMatcher[GeoCoordinate](locationLibraryCatalog, locationLibraryCache)
        sensorDataIterator.map { sensorDataElement =>
          sensorDataElement.tileId ->
            sensorDataElement.positionEstimateList
              .map {
                pathMatcher
                  .matchPath(_)
                  .results
                  .flatMap {
                    case OnRoad(matched) =>
                      Some(GeoCoordinate(matched.nearest.latitude, matched.nearest.longitude))
                    case _ => None
                  }
              }
        }
      }

    publish(convertToGeoJSON(matchedPaths))

  }

  // Retrieves the contents of the sensor data archive partitions.
  private def getSensorData: Dataset[SensorData] = {

    // Extracting input data.
    // There are two different options for the query parameter: `hour_from` and `tile_id`.
    // You can either make it `hour_from>0` to get all available messages or make it
    // `tile_id==$tile_id` to get messages by the specific partition.
    // Possible tile_ids = [377893756, 377894443, 377894442, ...].
    // For CN environment = [389695267, 389696772, 389695401, ...].
    // You can implement more complex queries with RSQL
    // (https://developer.here.com/olp/documentation/data-client-library/dev_guide/client/rsql.html)
    val sdiiMessages: DataFrame = sparkSession
      .readLayer(sensorDataArchiveHrn, sensorDataArchiveVersionLayerName)
      .query(s"hour_from>0")
      .load()

    val sensorData = sdiiMessages
      .select($"idx_tile_id" as "tileId", $"path.positionEstimate" as "fullPositionEstimate")
      .withColumn("positionEstimate", convertToGeoCoordinate($"fullPositionEstimate"))
      .groupBy("tileId")
      .agg(collect_list("positionEstimate") as "positionEstimateList")
      .as[SensorData]

    sensorData
  }

  val convertToGeoCoordinate: UserDefinedFunction = udf((positionEstimateRows: Seq[Row]) => {
    positionEstimateRows.map { positionEstimate =>
      val lat = positionEstimate.getAs[Double]("latitude_deg")
      val lon = positionEstimate.getAs[Double]("longitude_deg")
      GeoCoordinate(lat, lon)
    }
  })

  // Publish the GeoJSON partitions to the output catalog using Data Client Library.
  private def publish(geoJsonByTile: Dataset[(Long, String)]): Unit = {
    val masterActorSystem: ActorSystem =
      DataClientSparkContextUtils.context.actorSystem

    // Start commit on master.
    val masterPublishApi = DataClient(masterActorSystem).publishApi(outputCatalogHrn)

    // Get the latest catalog version of the output catalog.
    val baseVersion = masterPublishApi.getBaseVersion().awaitResult()

    // Fill in the direct and indirect dependencies for the output catalog, given the direct inputs.
    // This is good practice, especially if you intend to use the scheduler to
    // determine when the pipeline should run, (i.e., by using the --with-scheduler option
    // in
    // For users using platform.here.com:
    // https://developer.here.com/olp/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create
    // For users using platform.hereolp.cn:
    // https://developer.here.com/olp/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create
    val dependencies =
      gatherDependencies(locationCatalogHrn, locationCatalogVersion)

    // Start a publication batch on top of most recent catalog version.
    val batchToken = masterPublishApi.startBatch(baseVersion, dependencies).awaitResult()

    // Send partitions to workers, and upload data and metadata.
    geoJsonByTile.foreachPartition({ partitions =>
      val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
      val workerPublishApi = DataClient(workerActorSystem).publishApi(outputCatalogHrn)
      val workerWriteEngine = DataEngine(workerActorSystem).writeEngine(outputCatalogHrn)
      val committedPartitions: Iterator[CommitPartition] =
        partitions.map {
          case (tileId: Long, geoJson: String) =>
            val newPartition =
              NewPartition(
                partition = tileId.toString,
                layer = outputLayer,
                data = NewPartition.ByteArrayData(geoJson.getBytes)
              )
            workerWriteEngine.put(newPartition).awaitResult()
        }
      workerPublishApi
        .publishToBatch(batchToken, committedPartitions)
        .awaitResult()
    })
    masterPublishApi.completeBatch(batchToken).awaitResult()
    sparkSession.stop()
  }

  // Convert the trips to GeoJson; each trip is converted to a linestring.
  private def convertToGeoJSON(
      matchedTrips: Dataset[(Long, Seq[Seq[GeoCoordinate]])]): Dataset[(Long, String)] =
    matchedTrips.map { matchedTrip =>
      val features = matchedTrip._2
        .map(
          coordinates =>
            """{ "type": "Feature", "geometry": """ +
              """{ "type": "LineString", "coordinates": """ +
              coordinates.map(c => s"[${c.longitude}, ${c.latitude}]").mkString("[", ",", "]")
              + "}" + """, "properties": {} }""")
      val featureCollection =
        features.mkString("""{ "type": "FeatureCollection", "features": [""", ",", "]}")
      (matchedTrip._1, featureCollection)
    }

  // Gather the dependencies for the output catalog which depends on the location library catalog.
  private def gatherDependencies(locationHrn: HRN,
                                 locationVersion: Long): Seq[VersionDependency] = {
    val sparkActorSystem = DataClientSparkContextUtils.context.actorSystem
    val locationLibraryQuery = DataClient(sparkActorSystem).queryApi(locationHrn)
    val locationLibraryDeps =
      locationLibraryQuery.getVersion(locationVersion).awaitResult().dependencies

    // The output catalog depends directly on the location library
    // catalog, and indirectly on its respective dependencies.
    locationLibraryDeps.map(_.copy(direct = false)) ++
      Seq(VersionDependency(locationHrn, locationVersion, direct = true))
  }
  case class SensorData(tileId: Long, positionEstimateList: Seq[Seq[GeoCoordinate]])
}



/*
 * Copyright (c) 2018-2020 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.collect_list;
import static org.apache.spark.sql.functions.udf;

import akka.actor.ActorSystem;
import akka.japi.Pair;
import com.here.hrn.HRN;
import com.here.platform.data.client.engine.javadsl.DataEngine;
import com.here.platform.data.client.engine.javadsl.WriteEngine;
import com.here.platform.data.client.javadsl.CommitPartition;
import com.here.platform.data.client.javadsl.DataClient;
import com.here.platform.data.client.javadsl.NewPartition;
import com.here.platform.data.client.javadsl.PublishApi;
import com.here.platform.data.client.javadsl.QueryApi;
import com.here.platform.data.client.model.BatchToken;
import com.here.platform.data.client.model.VersionDependency;
import com.here.platform.data.client.spark.DataClientSparkContextUtils;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.location.core.geospatial.GeoCoordinate;
import com.here.platform.location.core.mapmatching.MatchResult;
import com.here.platform.location.core.mapmatching.NoTransition;
import com.here.platform.location.core.mapmatching.OnRoad;
import com.here.platform.location.core.mapmatching.javadsl.MatchResults;
import com.here.platform.location.core.mapmatching.javadsl.PathMatcher;
import com.here.platform.location.dataloader.core.Catalog;
import com.here.platform.location.dataloader.core.caching.CacheManager;
import com.here.platform.location.dataloader.spark.SparkCatalogFactory;
import com.here.platform.location.inmemory.graph.Vertex;
import com.here.platform.location.integration.optimizedmap.mapmatching.javadsl.PathMatchers;
import com.here.platform.pipeline.PipelineContext;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.StringJoiner;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import scala.collection.JavaConversions;
import scala.collection.mutable.WrappedArray;

// Read sensor data from an SDII archive and output the map-matched paths
// to a GeoJSON output catalog with the same tiling scheme.
public class BatchPathMatchingJava {

  private static PipelineContext pipelineContext = new PipelineContext();

  private static HRN outputCatalogHrn = pipelineContext.getConfig().getOutputCatalog();
  private static String outputLayer = "matched-trips";

  private static HRN sensorDataArchiveHrn =
      pipelineContext.getConfig().getInputCatalogs().get("sensor");
  private static String sensorDataArchiveVersionLayerName = "sample-index-layer";

  private static HRN locationCatalogHrn =
      pipelineContext.getConfig().getInputCatalogs().get("location");
  private static Long locationCatalogVersion =
      pipelineContext.getJob().get().getInputCatalogs().get("location").version();

  private static SparkSession sparkSession =
      SparkSession.builder().appName("BatchPathMatchingPipeline").getOrCreate();

  public static void main(String[] args) throws InterruptedException, ExecutionException {

    Dataset<SensorData> sensorData = getSensorData();

    // Set up the location library catalog factory and cache used by the path matcher.
    SparkCatalogFactory locationLibraryCatalogFactory =
        new SparkCatalogFactory(scala.concurrent.duration.Duration.Inf());
    Catalog locationLibraryCatalog =
        locationLibraryCatalogFactory.create(locationCatalogHrn, locationCatalogVersion);
    CacheManager locationLibraryCache = CacheManager.withLruCache();

    // Map match each path inside each sensor archive tile
    // (https://en.wikipedia.org/wiki/Map_matching)
    // and return a list of lists of (lon,lat) string pairs.
    // Each element in the top-level list represents a path as a list of coordinates.
    JavaRDD<Pair<Long, List<List<GeoCoordinate>>>> matchedPaths =
        sensorData
            .javaRDD()
            .mapPartitions(
                tiles -> {
                  PathMatcher<GeoCoordinate, Vertex, NoTransition> pathMatcher =
                      PathMatchers.carPathMatcher(locationLibraryCatalog, locationLibraryCache);
                  Iterable<SensorData> tileIterable = () -> tiles;
                  return StreamSupport.stream(tileIterable.spliterator(), false)
                      .map(
                          sensorDataElement ->
                              new Pair<>(
                                  sensorDataElement.getTileId(),
                                  getCoordinatesMatchedToPath(
                                      pathMatcher, sensorDataElement.getPositionEstimateList())))
                      .iterator();
                });

    publish(convertToGeoJSON(matchedPaths));
  }

  // Using path matcher get new list of lists of GeoCoordinates,
  // each list representing a path.
  private static List<List<GeoCoordinate>> getCoordinatesMatchedToPath(
      PathMatcher<GeoCoordinate, Vertex, NoTransition> pathMatcher,
      List<List<String>> positionEstimatePathList) {
    return positionEstimatePathList
        .stream()
        .map(BatchPathMatchingJava::convertToGeoCoordinates)
        .map(
            positionEstimatePath ->
                pathMatcher
                    .matchPath(positionEstimatePath)
                    .results()
                    .stream()
                    .map(BatchPathMatchingJava::matchResultOnRoad)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList()))
        .collect(Collectors.toList());
  }

  // Convert list of string pairs, representing coordinates to actual GeoCoordinates
  private static List<GeoCoordinate> convertToGeoCoordinates(List<String> coordinatePairs) {
    return coordinatePairs
        .stream()
        .map(
            coordinatePair -> {
              String[] coordinates = coordinatePair.split(",", 2);
              double latitude = Double.parseDouble(coordinates[0]);
              double longitude = Double.parseDouble(coordinates[1]);
              return new GeoCoordinate(latitude, longitude);
            })
        .collect(Collectors.toList());
  }

  // Construct new GeoCoordinates for on-road matches.
  private static GeoCoordinate matchResultOnRoad(MatchResult<Vertex> matchResult) {
    if (MatchResults.isOnRoad(matchResult)) {
      OnRoad<Vertex> onRoad = (OnRoad<Vertex>) matchResult;
      GeoCoordinate nearest = onRoad.elementProjection().nearest();
      return new GeoCoordinate(nearest.latitude(), nearest.longitude());
    }
    return null;
  }

  // Retrieves the contents of the sensor data archive partitions.
  private static Dataset<SensorData> getSensorData() {

    // Extracting input data.
    // There are two different options for the query parameter: `hour_from` and `tile_id`.
    // You can either make it `hour_from>0` to get all available messages or make it
    // `tile_id==$tile_id` to get messages by the specific partition.
    // Possible tile_ids = [377893756, 377894443, 377894442, ...].
    // For CN environment = [389695267, 389696772, 389695401, ...].
    // You can implement more complex queries with RSQL
    // (https://developer.here.com/olp/documentation/data-client-library/dev_guide/client/rsql.html)
    Dataset<Row> sdiiMessages =
        JavaLayerDataFrameReader.create(sparkSession)
            .readLayer(sensorDataArchiveHrn, sensorDataArchiveVersionLayerName)
            .query("hour_from>0")
            .load();

    // Define structure of List of coordinate pairs - Spark uses it to encode data
    ArrayType schema = DataTypes.createArrayType(DataTypes.StringType);

    // UDF to extract only coordinates from the complex structure of path
    UserDefinedFunction positionEstimateToCoordinatePair =
        udf(
            (UDF1<WrappedArray<Row>, List<String>>)
                positionEstimateList ->
                    JavaConversions.seqAsJavaList(positionEstimateList)
                        .stream()
                        .map(
                            positionEstimate -> {
                              Double lat = positionEstimate.getAs("latitude_deg");
                              Double lon = positionEstimate.getAs("longitude_deg");
                              return lat + "," + lon;
                            })
                        .collect(Collectors.toList()),
            schema);

    Encoder<SensorData> sensorDataEncoder = Encoders.bean(SensorData.class);
    // Extract list of paths (each path represented as a list of coordinates)
    return sdiiMessages
        .select(
            col("idx_tile_id").as("tileId"),
            col("path.positionEstimate").as("fullPositionEstimate"))
        .withColumn(
            "positionEstimate", positionEstimateToCoordinatePair.apply(col("fullPositionEstimate")))
        .groupBy("tileId")
        .agg(collect_list("positionEstimate").as("positionEstimateList"))
        .as(sensorDataEncoder);
  }

  // Publish the GeoJSON partitions to the output catalog using Data Client Library.
  private static void publish(JavaRDD<Pair<Long, String>> geoJsonByTile)
      throws InterruptedException, ExecutionException {
    ActorSystem masterActorSystem = DataClientSparkContextUtils.context().actorSystem();

    // Start commit on master.
    PublishApi masterPublishApi = DataClient.get(masterActorSystem).publishApi(outputCatalogHrn);

    // Get the latest catalog version of the output catalog.
    OptionalLong baseVersion = masterPublishApi.getBaseVersion().toCompletableFuture().get();

    // Fill in the direct and indirect dependencies for the output catalog, given the direct inputs.
    // This is good practice, especially if you intend to use the scheduler to
    // determine when the pipeline should run, (i.e., by using the --with-scheduler option
    // in
    // For users using platform.here.com:
    // https://developer.here.com/olp/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create
    // For users using platform.hereolp.cn:
    // https://developer.here.com/olp/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create
    List<VersionDependency> dependencies =
        gatherDependencies(locationCatalogHrn, locationCatalogVersion);

    // Start a publication batch on top of most recent catalog version.
    BatchToken batchToken =
        masterPublishApi.startBatch(baseVersion, dependencies).toCompletableFuture().get();

    // Send partitions to workers, and upload data and metadata.
    geoJsonByTile.foreachPartition(
        partitions -> {
          ActorSystem workerActorSystem = DataClientSparkContextUtils.context().actorSystem();
          PublishApi workerPublishApi =
              DataClient.get(workerActorSystem).publishApi(outputCatalogHrn);
          WriteEngine workerWriteEngine =
              DataEngine.get(workerActorSystem).writeEngine(outputCatalogHrn);
          ArrayList<CommitPartition> commitPartitions = new ArrayList<>();

          while (partitions.hasNext()) {
            Pair<Long, String> content = partitions.next();
            NewPartition newPartition =
                new NewPartition.Builder()
                    .withPartition(content.first().toString())
                    .withLayer(outputLayer)
                    .withData(content.second().getBytes())
                    .build();
            commitPartitions.add(workerWriteEngine.put(newPartition).toCompletableFuture().join());
          }

          workerPublishApi
              .publishToBatch(batchToken, commitPartitions.iterator())
              .toCompletableFuture()
              .join();
        });

    // Complete the commit.
    masterPublishApi.completeBatch(batchToken).toCompletableFuture().join();
    sparkSession.stop();
  }

  // Convert the trips to GeoJson; each trip is converted to a linestring.
  private static JavaRDD<Pair<Long, String>> convertToGeoJSON(
      JavaRDD<Pair<Long, List<List<GeoCoordinate>>>> matchedTrips) {
    return matchedTrips.map(
        matchedTrip -> {
          String featureCollection =
              matchedTrip
                  .second()
                  .stream()
                  .map(BatchPathMatchingJava::mapToFeature)
                  .collect(
                      Collectors.joining(
                          ",", "{ \"type\": \"FeatureCollection\", \"features\": [", "]}"));
          return new Pair<>(matchedTrip.first(), featureCollection);
        });
  }

  // Convert path to GeoJson Feature
  private static String mapToFeature(List<GeoCoordinate> coordinates) {
    StringJoiner coordJoiner = new StringJoiner(",", "[", "]");
    coordinates
        .stream()
        .map(c -> "[" + c.getLongitude() + ", " + c.getLatitude() + "]")
        .forEach(coordJoiner::add);
    return "{ \"type\": \"Feature\", \"geometry\": "
        + "{ \"type\": \"LineString\", \"coordinates\": "
        + coordJoiner.toString()
        + "}"
        + ", \"properties\": {} }";
  }

  // Gather the dependencies for the output catalog which depends on the location library catalog.
  private static List<VersionDependency> gatherDependencies(HRN locationHrn, Long locationVersion) {
    ActorSystem sparkActorSystem = DataClientSparkContextUtils.context().actorSystem();
    QueryApi locationLibraryQuery = DataClient.get(sparkActorSystem).queryApi(locationHrn);
    List<VersionDependency> locationDeps =
        locationLibraryQuery
            .getVersion(locationVersion)
            .toCompletableFuture()
            .join()
            .getDependencies();

    List<VersionDependency> retval =
        locationDeps
            .stream()
            .map(dep -> new VersionDependency(dep.hrn(), dep.version(), false))
            .collect(Collectors.toList());
    retval.add(new VersionDependency(locationHrn, locationVersion, true));
    return retval;
  }

  /**
   * Class used for encoding data in Dataset. Contains tileId and list of lists of pairs, each list
   * representing a path and each pair representing coordinate.
   */
  public static class SensorData implements Serializable {

    private Long tileId;
    private List<List<String>> positionEstimateList;

    public Long getTileId() {
      return tileId;
    }

    public void setTileId(Long tileId) {
      this.tileId = tileId;
    }

    public List<List<String>> getPositionEstimateList() {
      return positionEstimateList;
    }

    public void setPositionEstimateList(List<List<String>> positionEstimateList) {
      this.positionEstimateList = positionEstimateList;
    }
  }
}


Declare the Catalog Inputs to the Path Matching Application

The pipeline configuration files which declare the catalog inputs to the path matching application are as follows.

Replace the value for {{YOUR_CATALOG_HRN}} in the output-catalog HRN.

pipeline-config.conf

pipeline.config {

  output-catalog {hrn = "{{YOUR_CATALOG_HRN}}"}

  input-catalogs {
    //Please, use hrn:here-cn:data::olp-cn-here:here-optimized-map-for-location-library-china-2 on China Environment
    location {hrn = "hrn:here:data::olp-here:here-optimized-map-for-location-library-2"}
    //Please, use hrn:here-cn:data::olp-cn-here:sample-data on China Environment
    sensor {hrn = "hrn:here:data::olp-here:olp-sdii-sample-berlin-2"}
  }
}

We will use SDII Sensor Data Sample Catalog and HERE Optimized Map for Location Library Catalog as an input data catalogs, so we need to link they to the project. To do this, replace {{YOUR_PROJECT_HRN}} with the HRN of your project in the following commands and execute they:


olp project resources link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:olp-sdii-sample-berlin-2
olp project resources link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:here-optimized-map-for-location-library-2

The CLI should return the following messages:


Project resource hrn:here:data::olp-here:olp-sdii-sample-berlin-2 has been linked.
Project resource hrn:here:data::olp-here:here-optimized-map-for-location-library-2 has been linked.

pipeline-job.conf

// NOTE: If you are running the application in a pipeline with the scheduler,
// i.e., using the --with-scheduler option when creating the pipeline version
// (https://developer.here.com/olp/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create)
// then this file is not needed, as the scheduler will pick up the latest versions
// of the input catalogs.
pipeline.job.catalog-versions {

  output-catalog {base-version = -1}

  input-catalogs {
    location {
      processing-type = "reprocess"
      version = 1
    }
  }
}

For more information on pipeline configuration files, see the Pipeline API documentation.

Compile and Run Locally

To run the application locally, execute the following command:

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=BatchPathMatchingScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dspark.master=local[*] \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

mvn compile exec:java \
    -Dexec.mainClass=BatchPathMatchingJava \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dspark.master=local[*] \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

Further Information

For more details on the capabilities and architecture of the Location Library, see the Developer's Guide. The Code Examples page also lists more detailed code examples using the Location Library.

You can also build this simple path matching application as a fat JAR file:

mvn -Pplatform clean package

and deploy it via the Pipeline API. For more details on pipeline deployment, see the Pipeline User Guide.

results matching ""

    No results matching ""