Use Spark Connector to read and write data
Objectives: Understand how to use the Spark Connector to read and write data from different layers and data formats in a catalog.
Complexity: Beginner
Time to complete: 30 min
Prerequisites: Organize your work in projects
Source code: Download
The example in this tutorial demonstrates how to use the Spark Connector provided by the Data Client Library. This provides support for interacting with Spark for batch processing workloads, allowing the use of all standard APIs and functions in Spark to read, write, and delete data. For stream processing workloads, you should use the provided Flink Connector instead.
In the main part of the tutorial, the following usages are covered:
- Reading, printing, and collecting data from the index layer in the avro format (note that the Spark connector is able to infer the format, and that this does not need to be specified)
- Reading data from the versioned layer in protobuf format, and then transforming and writing it to the volatile layer in the parquet format
- Reading, filtering, augmenting, and writing data from versioned layer in a custom format.
As a preparation step, you must create your data sources, including their respective catalog with appropriate layers, so that these are in place when it comes to the main part of this tutorial. The dataset used is sourced from the HERE Map Content catalog, and contains information on road network topology and geometry.
Set up the Maven project
Create the following folder structure for the project:
spark-connector
└── src
└── main
├── java
└── resources
└── scala
You can do this with a single bash
command:
mkdir -p spark-connector/src/main/{java,resources,scala}
Create a file named pipeline-config.conf
, and populate it with the following contents, replacing {{YOUR_CATALOG_HRN}}
and {{YOUR_OUTPUT_CATALOG_HRN}}
with the HRN to the catalog you created in Organize your work in projects.
pipeline.config {
output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }
input-catalogs {
//Please, use hrn:here-cn:data::olp-cn-here:here-map-content-china-2 on China Environment
roadTopologySource { hrn = "hrn:here:data::olp-here:rib-2" }
roadTopologyInput { hrn = "{{YOUR_CATALOG_HRN}}" }
}
}
In this tutorial, a public catalog is used - HERE Map Content Catalog. The catalog should be linked to your project first to be used within the project. To do this, replace {{YOUR_PROJECT_HRN}}
with the HRN of the project you created in the Organize your work in projects tutorial and execute the following command:
olp project resource link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:rib-2
If your command is successful, the CLI returns the following message:
Project resource hrn:here:data::olp-here:rib-2 has been linked.
The POM for this example is identical to that in the Path Matching in Spark Tutorial.
Parent POM:
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.54.3</version>
<relativePath/>
</parent>
Dependencies:
<dependencies>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>spark-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.hrn</groupId>
<artifactId>hrn_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
You must create both the input and output catalogs. You can perform these steps by following the steps outlined in the Organize your work in projects, using the OLP Command Line Interface (CLI).
You should use a unique identifier name for the catalog, for example {{YOUR_USERNAME}}-spark-connector-input
.
Create a file called spark-connector-input.json
with the contents below, replacing {{YOUR_INPUT_CATALOG_ID}}
with the chosen identifier.
Note
All timestamps are in UTC milliseconds since epoch (Jan 1, 1970 00:00:00 AM UTC). If you run your application in another timezone please make sure that the timestamp is converted into UTC before you query or upload data. In Java or Scala you can do the conversion by using this function call: Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()
{
"id": "spark-connector-input",
"name": "Simulated road topology data archive (From tutorial) spark-connector-input",
"summary": "Archive of simulated road topology data",
"description": "Archive of simulated road topology data.",
"tags": ["Tutorial", "Simulated"],
"layers": [
{
"id": "index-layer-avro-data",
"name": "index-layer-avro-data",
"summary": "Simulated data.",
"description": "Simulated road topology data.\n\nThis data is meant to be used for demonstration purposes and \"playing\" with data.",
"tags": ["Tutorial", "Simulated"],
"contentType": "application/x-avro-binary",
"layerType": "index",
"indexProperties": {
"indexDefinitions": [
{
"name": "ingestion_timestamp",
"duration": 600000,
"type": "timewindow"
},
{
"name": "tile_id",
"type": "heretile",
"zoomLevel": 12
}
],
"ttl": "unlimited"
},
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
},
{
"id": "versioned-layer-protobuf-data",
"name": "versioned-layer-protobuf-data",
"summary": "Simulated data.",
"description": "Simulated road topology data for versioned-layer-protobuf-data",
"contentType": "application/x-protobuf",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
},
"schema": {
"hrn": "hrn:here:schema::olp-here:com.here.schema.rib:topology-geometry_v2:2.41.0"
}
},
{
"id": "versioned-layer-custom-data",
"name": "versioned-layer-custom-data",
"summary": "Simulated data.",
"description": "Simulated road topology data for versioned-layer-custom-data",
"contentType": "application/octet-stream",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
}
]
}
For the output catalog, you can name the file spark-connector-ouput.json
as shown in the following:
{
"id": "spark-connector-output",
"name": "Simulated data archive (From tutorial) test-spark-connector-output",
"summary": "Archive of simulated road topology data",
"description": "Archive of simulated road topology data.",
"tags": ["Tutorial", "Simulated"],
"layers": [
{
"id": "volatile-layer-parquet-data",
"name": "volatile-layer-parquet-data",
"summary": "Simulated data.",
"description": "Simulated road topology data.",
"contentType": "application/x-parquet",
"layerType": "volatile",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
},
{
"id": "versioned-layer-custom-data",
"name": "versioned-layer-custom-data",
"summary": "Simulated data.",
"description": "Simulated road topology data for versioned-layer-custom-data",
"contentType": "application/octet-stream",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
}
]
}
Replace {{YOUR_CATALOG_ID}}
with your own identifier, as shown in the following code snippet. Also, replace {{YOUR_PROJECT_HRN}}
with the HRN of your project from Organize your work in projects and then run the following command:
olp catalog create {{YOUR_CATALOG_ID}} \
"Simulated road topology data input from tutorial ({{YOUR_USERNAME}})" \
--config spark-connector-input.json \
--scope {{YOUR_PROJECT_HRN}}
Note
If a billing tag is required in your realm, update the config file by adding the billingTags: ["YOUR_BILLING_TAG"]
property to the layer
section.
Set up the data sources
This step involves implementing an application for setting up the data sources that are used in the main part of this tutorial. The Spark Connector is used here to read data from a source containing data on road topology and geometry, and writing it to a catalog which is used as input in the next stage.
For this purpose, you must read the data from a dataset containing road topology and geometry data, and save it to 3 data sources, each one corresponding to one of the layer types and data formats defined above when creating the input catalogs.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Row, SparkSession}
import com.here.platform.data.client.spark.scaladsl.{
GroupedData,
VersionedDataConverter,
VersionedRowMetadata
}
import org.slf4j.LoggerFactory
object DataSourcesSetupScala extends App {
private val logger = LoggerFactory.getLogger(DataSourcesSetupScala.getClass)
private val pipelineContext = new PipelineContext
private val roadTopologyDataSourceCatalogHrn =
pipelineContext.config.inputCatalogs("roadTopologySource")
private val roadTopologyDataSourceLayerId = "topology-geometry"
private val inputRoadTopologyCatalogHrn =
pipelineContext.config.inputCatalogs("roadTopologyInput")
private val inputIndexAvroLayerId = "index-layer-avro-data"
private val inputVersionedProtobufLayerId = "versioned-layer-protobuf-data"
private val inputversionedLayerCustomId = "versioned-layer-custom-data"
val sparkSession: SparkSession =
SparkSession.builder().appName("DataSourcesSetupScalaPipeline").getOrCreate()
import sparkSession.implicits._
private val berlinPartition = "23618361"
private val munichPartition = "23611420"
val sourceRoadTopologyData = sparkSession
.readLayer(roadTopologyDataSourceCatalogHrn, roadTopologyDataSourceLayerId)
.query(s"mt_partition=in=($berlinPartition, $munichPartition)")
.load()
val transformedData = sourceRoadTopologyData
.select(col("partition_name"), explode(col("segment")) as "road")
.select(col("partition_name"),
col("road.identifier") as "identifier",
explode(col("road.geometry.point")) as "points",
col("road.length") as "length")
.select("partition_name", "identifier", "points.*", "length")
val indexLayerData = transformedData
.withColumn("idx_tile_id", col("partition_name"))
.withColumn("idx_ingestion_timestamp", unix_timestamp())
indexLayerData
.writeLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
.save()
val versionedLayerData = sourceRoadTopologyData
.withColumn("mt_partition", col("partition_name"))
versionedLayerData
.writeLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
.save()
val customRawLayerId = transformedData
.select(col("length"))
.map(v => s"${lit("road-partition-")}:${lit(v.mkString).toString}".getBytes)
.select(col("value") as "data")
.withColumn("mt_partition", concat(lit("partition-"), monotonically_increasing_id))
customRawLayerId
.writeLayer(inputRoadTopologyCatalogHrn, inputversionedLayerCustomId)
.withDataConverter(new VersionedDataConverter {
override def serializeGroup(rowMetadata: VersionedRowMetadata,
rows: Iterator[Row]): GroupedData[VersionedRowMetadata] = {
val bytes = rows.next().getAs[Array[Byte]]("data")
GroupedData(rowMetadata, bytes)
}
})
.save()
logger.info(s"Finished setting up data sources in catalog: $inputRoadTopologyCatalogHrn")
sparkSession.stop()
}
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.data.client.spark.javadsl.VersionedDataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.VersionedRowMetadata;
import com.here.platform.pipeline.PipelineContext;
import java.util.Iterator;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DataSourcesSetup {
private static PipelineContext pipelineContext = new PipelineContext();
private static HRN roadTopologyDataSourceCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("roadTopologySource");
private static String roadTopologyDataSourceLayerId = "topology-geometry";
private static HRN inputRoadTopologyCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("roadTopologyInput");
private static String inputIndexAvroLayerId = "index-layer-avro-data";
private static String inputVersionedProtobufLayerId = "versioned-layer-protobuf-data";
private static String inputversionedLayerCustomId = "versioned-layer-custom-data";
private static SparkSession sparkSession =
SparkSession.builder().appName("DataSourcesSetupPipeline").getOrCreate();
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(DataSourcesSetup.class);
String berlinPartition = "23618361";
String munichPartition = "23611420";
Dataset<Row> sourceRoadTopologyData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(roadTopologyDataSourceCatalogHrn, roadTopologyDataSourceLayerId)
.query(String.format("mt_partition=in=(%s, %s)", berlinPartition, munichPartition))
.load();
Dataset<Row> transformedData =
sourceRoadTopologyData
.select(col("partition_name"), explode(col("segment")).as("road"))
.select(
col("partition_name"),
col("road.identifier").as("identifier"),
explode(col("road.geometry.point")),
col("road.length").as("length"))
.select("partition_name", "identifier", "col.*", "length");
Dataset<Row> indexLayerData =
transformedData
.withColumn("idx_tile_id", col("partition_name"))
.withColumn("idx_ingestion_timestamp", unix_timestamp());
JavaLayerDataFrameWriter.create(indexLayerData)
.writeLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
.save();
Dataset<Row> versionedLayerData =
sourceRoadTopologyData.withColumn("mt_partition", col("partition_name"));
JavaLayerDataFrameWriter.create(versionedLayerData)
.writeLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
.save();
Dataset<Row> customRawLayerId =
transformedData
.select(col("length"))
.map((MapFunction<Row, byte[]>) row -> row.toString().getBytes(), Encoders.BINARY())
.select(col("value").as("data"))
.withColumn("mt_partition", concat(lit("partition-"), monotonically_increasing_id()));
JavaLayerDataFrameWriter.create(customRawLayerId)
.writeLayer(inputRoadTopologyCatalogHrn, inputversionedLayerCustomId)
.withDataConverter(
new VersionedDataConverter() {
@Override
public GroupedData<VersionedRowMetadata> serializeGroup(
VersionedRowMetadata rowMetadata, Iterator<Row> rows) {
byte[] bytes = rows.next().getAs("data");
return new GroupedData<>(rowMetadata, bytes);
}
})
.save();
logger.info(
String.format(
"Finished setting up data sources in catalog: %s", inputRoadTopologyCatalogHrn));
sparkSession.stop();
}
}
Implement the Spark connector application
This application uses the data sources created in the previous stage to read from different layers and data formats. This performs some transformations on the resulting data, and writes to the output layers from the previously created catalog.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.scaladsl.{
GroupedData,
VersionedDataConverter,
VersionedRowMetadata
}
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SparkSession}
import org.slf4j.LoggerFactory
object SparkConnectorScala extends App {
private val logger = LoggerFactory.getLogger(SparkConnectorScala.getClass)
private val pipelineContext = new PipelineContext
private val inputRoadTopologyCatalogHrn =
pipelineContext.config.inputCatalogs("roadTopologyInput")
private val outputRoadTopologyCatalogHrn = pipelineContext.config.outputCatalog
private val inputIndexAvroLayerId = "index-layer-avro-data"
private val inputVersionedProtobufLayerId = "versioned-layer-protobuf-data"
private val versionedLayerCustomId = "versioned-layer-custom-data"
private val outputVolatileParquetLayerId = "volatile-layer-parquet-data"
val sparkSession: SparkSession =
SparkSession.builder().appName("SparkConnectorScalaPipeline").getOrCreate()
import sparkSession.implicits._
private val berlinPartition = "23618361"
private val munichPartition = "23611420"
val avroData: DataFrame = sparkSession
.readLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
.query(s"tile_id==$munichPartition")
.load()
avroData.printSchema()
avroData.show()
avroData.collect()
val protobufData: DataFrame = sparkSession
.readLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
.query(s"mt_partition=in=($berlinPartition, $munichPartition)")
.load()
case class RoadSegmentTopology(identifier: String,
partition_name: String,
points: Array[SegmentPoint],
length: Double)
case class SegmentPoint(latitude: String, longitude: String, z_level: Integer, elevation: Double)
val transformedData = protobufData
.select(col("partition_name"), explode(col("segment")) as "road")
.select(col("road.identifier") as "identifier",
col("partition_name"),
col("road.geometry.point") as "points",
col("road.length") as "length")
.as[RoadSegmentTopology]
val statsByPartition: DataFrame = transformedData
.groupBy("partition_name")
.agg(round(avg("length")) as "average_length",
round(stddev("length")) as "std_length",
count("points") as "number_of_points")
.withColumn("mt_partition", col("partition_name"))
statsByPartition
.writeLayer(outputRoadTopologyCatalogHrn, outputVolatileParquetLayerId)
.save()
val customReadData: Dataset[String] = sparkSession
.readLayer(inputRoadTopologyCatalogHrn, versionedLayerCustomId)
.query(s"mt_partition=in=(partition-1, partition-2, partition-3)")
.load()
.map { row =>
row.getAs[Array[Byte]]("data").map(_.toChar).mkString
}(Encoders.STRING)
val customData: DataFrame = customReadData
.map(_.getBytes)
.toDF
.withColumn("data", col("value"))
.select(col("value") as "data")
.withColumn("mt_partition", concat(lit("id-"), monotonically_increasing_id))
customData
.writeLayer(outputRoadTopologyCatalogHrn, versionedLayerCustomId)
.withDataConverter(new VersionedDataConverter {
override def serializeGroup(rowMetadata: VersionedRowMetadata,
rows: Iterator[Row]): GroupedData[VersionedRowMetadata] = {
val bytes = rows.next().getAs[Array[Byte]]("data")
GroupedData(rowMetadata, bytes)
}
})
.save()
logger.info(s"Finished setting up data sources in catalog: $outputRoadTopologyCatalogHrn")
sparkSession.stop()
}
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.data.client.spark.javadsl.VersionedDataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.VersionedRowMetadata;
import com.here.platform.pipeline.PipelineContext;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkConnector {
private static final PipelineContext pipelineContext = new PipelineContext();
private static final HRN inputRoadTopologyCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("roadTopologyInput");
private static final HRN outputRoadTopologyCatalogHrn =
pipelineContext.getConfig().getOutputCatalog();
private static final String INPUT_INDEX_AVRO_LAYER_ID = "index-layer-avro-data";
private static final String INPUT_VERSIONED_PROTOBUF_LAYER_ID = "versioned-layer-protobuf-data";
private static final String VERSIONED_LAYER_CUSTOM_ID = "versioned-layer-custom-data";
private static final String OUTPUT_VOLATILE_PARQUET_LAYER_ID = "volatile-layer-parquet-data";
public static void main(String[] args) {
SparkSession sparkSession =
SparkSession.builder().appName("SparkConnectorPipeline").getOrCreate();
Logger logger = LoggerFactory.getLogger(SparkConnector.class);
String berlinPartition = "23618361";
String munichPartition = "23611420";
Dataset<Row> avroData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(inputRoadTopologyCatalogHrn, INPUT_INDEX_AVRO_LAYER_ID)
.query(String.format("tile_id==%s", munichPartition))
.load();
avroData.printSchema();
avroData.show();
avroData.collect();
Dataset<Row> protobufData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(inputRoadTopologyCatalogHrn, INPUT_VERSIONED_PROTOBUF_LAYER_ID)
.query(String.format("mt_partition=in=(%s, %s)", berlinPartition, munichPartition))
.load();
Encoder<RoadSegmentTopology> roadSegmentTopologyEncoder =
Encoders.bean(RoadSegmentTopology.class);
Dataset<RoadSegmentTopology> transformedData =
protobufData
.select(col("partition_name"), explode(col("segment")).as("road"))
.select(
col("road.identifier").as("identifier"),
col("partition_name"),
col("road.geometry.point").as("points"),
col("road.length").as("length"))
.as(roadSegmentTopologyEncoder);
Dataset<Row> statsByPartition =
transformedData
.groupBy("partition_name")
.agg(
round(avg("length")).as("average_length"),
round(stddev("length")).as("std_length"),
count("points").as("number_of_points"))
.withColumn("mt_partition", col("partition_name"));
JavaLayerDataFrameWriter.create(statsByPartition)
.writeLayer(outputRoadTopologyCatalogHrn, OUTPUT_VOLATILE_PARQUET_LAYER_ID)
.save();
Dataset<Row> customData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(inputRoadTopologyCatalogHrn, VERSIONED_LAYER_CUSTOM_ID)
.query("mt_partition=in=(partition-1, partition-2, partition-3)")
.load();
Dataset<Row> partitionedData =
customData.withColumn(
"mt_partition", concat(lit("partition-"), monotonically_increasing_id()));
JavaLayerDataFrameWriter.create(partitionedData)
.writeLayer(outputRoadTopologyCatalogHrn, VERSIONED_LAYER_CUSTOM_ID)
.withDataConverter(
new VersionedDataConverter() {
@Override
public GroupedData<VersionedRowMetadata> serializeGroup(
VersionedRowMetadata rowMetadata, Iterator<Row> rows) {
byte[] bytes = rows.next().getAs("data");
return new GroupedData<>(rowMetadata, bytes);
}
})
.save();
logger.info(
String.format(
"Finished setting up data sources in catalog: %s", outputRoadTopologyCatalogHrn));
sparkSession.stop();
}
public static class RoadSegmentTopology implements Serializable {
private String identifier;
private String partition_name;
private SegmentPoint[] points;
private Double length;
}
public static class SegmentPoint implements Serializable {
private String partition_name;
private String longitude;
private Integer z_level;
private Double elevation;
}
}
Compile and run locally
To run the application locally, execute the following command:
mvn compile exec:java \
-Dexec.mainClass=DataSourcesSetupScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=DataSourcesSetup \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
For the main part of the tutorial, and after having the data sources in place, you must run:
mvn compile exec:java \
-Dexec.mainClass=SparkConnectorScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=SparkConnector \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
For additional details on the topics covered in this tutorial, you can refer to the following sources: