Compose an Incremental Processing Pipeline with DeltaSets

NOTE: DeltaSets is a new feature of Data Processing Library and the API might change in future versions. DeltaSets can only be used in Scala, not in Java projects.

DeltaSets are a new distributed processing abstraction provided by Data Processing Library. Similar to Spark RDDs, DeltaSets provide a functional interface for transforming data in a cluster, with transformations such as mapReduce and filterByKey. Their main difference from RDDs is that DeltaSet transformations can be computed incrementally if required.

DeltaSets allow you to build custom Compilation Patterns, which means that you can have compilers with as many resolveFn, compileInFn, and compileOutFn functions as required by your particular application.

Design

The main processing abstraction is a DeltaSet[K, V], where K is a type of key and V is a type of value. The DeltaSet represents a collection of key-value pairs that is stored and transformed in a Spark cluster. A key can be associated with one value only.

  • K — is often com.here.platform.data.processing.catalog.Partition.Key, which is a key identifying a partition inside the platform catalog. However, K can be any type that is Serializable and that has an implicit Ordering defined. Examples of these types include strings, integers, and tuples.

  • V — is often com.here.platform.data.processing.catalog.Partition.Meta that identifies data in the platform catalog, or a com.here.platform.data.processing.blobstore.Payload with the actual data stored in the catalog. However, V can be any type, even an integer or a string.

For example, reading the contents of the platform catalog layer result in a DeltaSet[Key, Meta].

A DeltaSet is always immutable, but transformations can be applied to it resulting in a transformed DeltaSet. For example, the transformation mapValues({x => x + 1}) can be used to transform a DeltaSet[Key, Int] into a new DeltaSet[Key, Int] in which all values are incremented by one.

Once a DeltaSet is transformed into DeltaSet[Key, Payload] to contain the desired payloads for the output catalog, we can publish it. This results in a PublishedSet which we can then commit as a new version of the output catalog.

The transformations are always lazy, which means that they are only performed when you commit the output catalog. In other words, a DeltaSet is not evaluated until it is committed to the platform catalog.

Example: Copy a Layer

In this example, we want to show how to implement a pipeline that copies a layer from one catalog to another catalog.

The simplest way to use DeltaSets is to extend the DeltaSimpleSetup in the application's Main object. To add support for the standard configuration files and command line options for pipelines, in this example we also extend the PipelineRunner trait (see Set up and Run the Driver), giving our Main object the following skeleton:

import com.here.platform.data.processing.catalog.{Catalog, Layer}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner

object Main extends PipelineRunner with DeltaSimpleSetup {

  val applicationVersion: String = "1.0"

  def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
    ???
  }
}

DeltaSimpleSetup requires the Main object to implement the setupSets method, which defines the processing logic of the pipeline. Processing logic defined using DeltaSets can be structured in many different ways, typically in four phases:

  1. Query the Key and Meta pairs from one or more input catalogs resulting in a DeltaSet[Key, Meta].
  2. Retrieve the payloads corresponding to the metadata resulting in a DeltaSet[Key, Payload].
  3. Transform the data stored in the payloads and rewrite the keys to store the target catalog and the target layer.
  4. Publish the transformed payloads, resulting in a PublishedSet that is then committed to the output catalog.

In this example, we copy the payloads without modifying them, instead of transforming them in Step 3.

  1. Query: To query the Key and Meta pairs from one or more input catalogs, the setupSets method provides as an argument a DeltaContext, which provides, among others, access to the input catalog.

    import com.here.platform.data.processing.catalog.Partition._
    val keyMetas: DeltaSet[Key, Meta] =
    context.queryCatalogLayer(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))
    
  2. Retrieve: To retrieve the Payloads corresponding to the metadata, first we need to import DeltaSet transformations from the context, and get an instance of the Retriever object for the corresponding catalog. Then, keyMetas can be transformed.

    import com.here.platform.data.processing.blobstore.Payload
    import context.transformations._ // This import enables transformations on DeltaSets
    val retriever = context.inRetriever(Catalog.Id("inCatalogA"))
    val keyPayloads: DeltaSet[Key, Payload] =
    keyMetas.mapValuesWithKey((key, meta) => retriever.getPayload(key, meta))
    
  3. Process: To rewrite the keys to store the target catalog and the target layer, we use the mapKeys operation.

    val rewrittenKeys: DeltaSet[Key, Payload] =
    keyPayloads.mapKeys(
     OneToOne(
       _.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
       _.copy(catalog = Catalog.Id("inCatalogA"), layer = Layer.Id("inLayerA"))
     ),
     PreservesPartitioning
    )
    
  4. Publish: To publish the transformed payloads, a DeltaSet[Key, Payload] provides a publish operation, which takes a set of layers to publish to as arguments.

    val result: PublishedSet = rewrittenKeys.publish(Set(Layer.Id("inLayerA")))
    Iterable(result)
    

The complete example is shown below:

import com.here.platform.data.processing.catalog.{Catalog, Layer, Partition}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner

object Main extends PipelineRunner with DeltaSimpleSetup {

  val applicationVersion: String = "1.0"

  def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
    import context.transformations._
    val retriever = context.inRetriever(Catalog.Id("inCatalogA"))
    Iterable(
      context
        .queryCatalogLayer(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))
        .mapValuesWithKey((key, meta) => retriever.getPayload(key, meta))
        .mapKeys(
          OneToOne[Partition.Key, Partition.Key](
            _.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
            _.copy(catalog = Catalog.Id("inCatalogA"), layer = Layer.Id("inLayerA"))
          ),
          PreservesPartitioning
        )
        .publish(Set(Layer.Id("outLayerA")))
    )
  }

}

Transformations

This section explains the transformations currently available on DeltaSets.

Publish Payloads

Every DeltaSet must eventually be turned into a set of payloads which contain the data to be published in the output catalog. The publish operation is available on any DeltaSet of type DeltaSet[Key, Payload] and this operation uploads all payloads to the Data API. The result of the publish operation is a PublishedSet that can only be returned from setupSets, but cannot be transformed any further. This is described in the example above.

val payloads: DeltaSet[Key, Payload] = ???
val published: PublishedSet = payloads.publish(Set(Layer.Id("outLayer")))

Transform Values

Use mapValues and mapValuesWithKey to transform the values inside of a DeltaSet. Both these operations do not modify the keys in the DeltaSet, hence they do not need to shuffle data between worker nodes in the cluster when they are run. Consequently, these operations are very efficient.

In the following example, the values in a DeltaSet[Key, Int] are incremented by one using mapValues.

val integers: DeltaSet[Key, Int] = ???
val incrementedIntegers: DeltaSet[Key, Int] = integers.mapValues(_ + 1)

mapValuesWithKey is a similar operation, but it also provides the key to the transformation function. See the copy-a-layer example for details on using this operation.

Transform Keys and Values

To transform both keys and values simultaneously, or to transform the key in a key-value pair based on its value, use one of these transformations: mapUnique, mapGroup, or mapReduce. However, if performance is a concern, then consider using either mapValues, mapValuesWithKey or one of the mapKeys* operations.

mapUnique transforms a key-value pair into a new key-value pair, as long as no duplicate keys are produced. If duplicate keys are produced, the transformation will fail at run-time. Since data is shuffled between nodes in the cluster, a partitioner must be explicitly provided as an argument.

For example, in the following snippet, keys and values are split in two layers:

  • Positive values: set the layer to positive_values.
  • Negative values: set the layer to negative_values.
val deltaSet1: DeltaSet[Key, Int] = ???
val split: DeltaSet[Key, Int] =
  deltaSet1.mapUnique(
    mapFn = { (key, i) =>
      if (i >= 0) {
        (key.copy(layer = Layer.Id("positive_values")), i)
      } else {
        (key.copy(layer = Layer.Id("negative_values")), i)
      }
    },
    partitioning = context.defaultPartitioner
  )

mapGroup transforms a key-value pair into a new key-value pair. The values of duplicate keys are grouped, such that the resulting DeltaSet assigns each key a collection of values.

For example, in the following snippet, metadata associated with HERE tile partitions are mapped to and grouped by their parent HERE tile.

val deltaSet1: DeltaSet[Key, Meta] = ???
val deltaSet2: DeltaSet[Key, Iterable[Meta]] =
  deltaSet1.mapGroup(
    mapFn = {
      case (key, value) =>
        (key.copy(partition = key.partition.parent.getOrElse(key.partition)), value)
    },
    partitioning = context.defaultPartitioner
  )

mapReduce transforms a key-value pair into a new version that reduces the values of duplicate keys, using the reduce function that you provide. This reduce function must combine two values into one. Using mapReduce is more efficient than using mapGroup and then reducing each value using mapValues.

For example, in the following snippet, all integers associated with HERE tile partitions are mapped to their parent HERE tile and the value of each output HERE tile is reduced to its sum.

val deltaSet1: DeltaSet[Key, Int] = ???
val deltaSet2: DeltaSet[Key, Int] =
  deltaSet1.mapReduce(
    mapFn = {
      case (key, value) =>
        (key.copy(partition = key.partition.parent.getOrElse(key.partition)), value)
    },
    reduceFn = _ + _,
    partitioning = context.defaultPartitioner
  )

These mapUnique, mapGroup, and mapReduce operations take a key-value mapping function to produce exactly one key. In contrast, flatMapUnique, flatMapGroup, and flatMapReduce operations take a mapping function to produce zero or more keys.

Transform Keys

DeltaSets provide a set of transformations for modifying the keys in a DeltaSet without taking into account values. These transformations are very efficient and should be preferred over transformations based on keys and values if possible.

For example, mapKeys is an efficient operation to transform just the keys in a DeltaSet without reading or writing the values. The key transformation must be 1-to-1, that is, every key in the input DeltaSet is mapped to a unique key in the output DeltaSet. To ensure that the transformation is 1-to-1 and to allow efficient incremental processing, mapKeys requires you to specify, both the key mapping function mapFn, and the inverse of that function, inverseFn:

val partitioner = NameHashPartitioner(10)
val input: DeltaSet[Key, Int] = ???
val incrementedIntegers: DeltaSet[Key, Int] =
  input.mapKeys(
    OneToOne(
      mapFn = key => key.copy(layer = Layer.Id("outLayerA")),
      inverseFn = key => key.copy(layer = Layer.Id("inLayerA"))
    ),
    partitioner
  )

See the copy-a-layer example for more details on how to use this operation.

If the DeltaSet contains a key x, for which inverseFn(mapFn(x)) != x, the transformation will fail at run-time. If it is inconvenient or impossible to specify the inverse transformation, consider using the more expensive mapUnique transformation explained below.

flatMapKeys is a transformation which maps each input key to zero or more output keys (1-to-many). Similar to mapKeys an inverse function must be passed to show that each output key is the result of exactly one input key.

When mapping multiple input keys to the same output key (1-to-1 or 1-to-n), the set of values can either be grouped or reduced, just like in key and value transformations. DeltaSets provide four transformations to cover all combinations of grouping/reducing and n-to-1/m-to-n: mapKeysGroup and flatMapKeysGroup group all values in a collection, whereas mapKeysReduce and flatMapKeysReduce apply a reduce function to all values. See DirectMToNCompiler migration for an example of how to use flatMapKeysGroup.

Filter Data

filterByKey filters key-value pairs from a DeltaSet based only on their keys. This transformation runs very efficiently and does not require any data exchange between nodes in the Spark cluster.

In the following snippet, administrative_places layer from the hmc catalog is queried and all partition keys with a generic partition name that starts with "1469256839" are filtered. This corresponds to reading all administrative places in Australia if the hmc catalog ID points to the HERE Map Content catalog.

import com.here.platform.data.processing.catalog.Partition.Generic
val filteredInput: DeltaSet[Key, Meta] =
  context
    .queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("administrative_places"))
    .filterByKey {
      case Key(catalog, layer, Generic(name)) =>
        name.startsWith("1469256839")
    }

For convenience, you can also use Partition Key Filters in a filterByKey operation. For example, to filter only those partitions that have a HERE tile as a partition name with the HERE tile belonging to a bounding box around Berlin, use the following filterByKey operation.

val filteredInput: DeltaSet[Key, Meta] =
  context
    .queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("road_attributes"))
    .filterByKey(
      BoundingBoxFilter(
        south = 50.97656,
        west = 11.95313,
        north = 51.06445,
        east = 12.04102
      )
    )

Partition Key Filters can also be defined from the configuration file, in the path here.platform.data-processing.deltasets.partitionKeyFilters. The partition key filters defined in the configuration file will apply to all query transformations and readBack.

Join Data

join is a DeltaSet transformation that takes two DeltaSets and produces a DeltaSet that contains, for each key contained in both DeltaSets, the pair of values associated with the key in each of the DeltaSets.

val integers: DeltaSet[Key, Int] = ???
val strings: DeltaSet[Key, String] = ???
val pairs: DeltaSet[Key, (Int, String)] = integers join strings

Other kinds of join transformations provided by DeltaSets are:

  • outerJoin takes two DeltaSets and produces a DeltaSet that contains, for each key contained in either DeltaSet, the pair of values associated with the key in each of the DeltaSets. If a key is not associated value in one of the DeltaSets, the entry in the pair is set to None.
  • leftOuterJoin takes two DeltaSets and produces a DeltaSet that contains, for each key contained in the left DeltaSet, the pair of values associated with the key in each of the DeltaSets. If a key is not associated value in the right DeltaSet, the entry in the pair is set to None.

The following example shows a common use-case of joins: two layers, road-attributes and topology-geometry are queried from catalog hmc (HERE Map Content). The keys in both resulting DeltaSets are rewritten to contain the same catalog and layer of the output catalog. Then, the outerJoin is computed, resulting in a DeltaSet that contains the metadata for both layers. In this way, the contents of the partitions can be correlated.

val topology: DeltaSet[Key, Meta] =
  context
    .queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("topology_geometry"))
    .mapKeys(
      OneToOne(
        _.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
        _.copy(catalog = Catalog.Id("hmc"), layer = Layer.Id("topology_geometry"))
      ),
      context.defaultPartitioner
    )
val roadAttributes: DeltaSet[Key, Meta] =
  context
    .queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("road_attributes"))
    .mapKeys(
      OneToOne(
        _.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
        _.copy(catalog = Catalog.Id("hmc"), layer = Layer.Id("road_attributes"))
      ),
      context.defaultPartitioner
    )
val pairs: DeltaSet[Key, (Option[Meta], Option[Meta])] =
  topology outerJoin roadAttributes

Note that join is a stateful transformation, while outerJoin and leftOuterJoin are not stateful, which can make the latter two more efficient.

Union Data

disjointUnion is a DeltaSet transformation that takes two input DeltaSets and produces an output DeltaSet that contains every key-value pair contained in either of the input DeltaSets. The operation throws an exception if there is a key that is contained in both input DeltaSets.

In the following snippet, we split an input DeltaSet into two DeltaSets with tiles at zoom level 10 and 12, respectively. Then, each DeltaSet is transformed separately using mapValues into strings and the union of the results are stored in the variable combined.

val input: DeltaSet[Key, Meta] = ???
val tilesAt10: DeltaSet[Key, String] =
  input
    .filterByKey { case Key(_, _, t: HereTile) => t.quad.getZoomLevel == 10 }
    .mapValues(???)
val tilesAt12: DeltaSet[Key, String] =
  input
    .filterByKey { case Key(_, _, t: HereTile) => t.quad.getZoomLevel == 12 }
    .mapValues(???)
val combined: DeltaSet[Key, String] =
  tilesAt10 disjointUnion tilesAt12

In the snippet above, both input DeltaSets have the exact same type. They can also have different value types, as in the following snippet. In this case, the value type of the output DeltaSet is a common supertype of value types of the inputs. In the following snippet, the union of DeltaSet[Key, Int] and DeltaSet[Key, String] is typed as DeltaSet[Key, Any]. This works because, in Scala, Any is a supertype of both String and Int.

val integers: DeltaSet[Key, Int] = ???
val strings: DeltaSet[Key, String] = ???
val pairs: DeltaSet[Key, Any] = integers disjointUnion strings

Dynamically Resolve References

mapValuesWithResolver can be used to transform a DeltaSet of key-meta pairs, the subjects, and dynamically access other partitions, the references, during the transformation. It is an alternative to static reference resolution using resolveReferences or a RefTreeCompiler, which requires pre-computing all required references up-front. In contrast, dynamic reference resolution is more flexible, requires fewer lines of code and can be faster than static reference resolution, especially for complex reference structures.

mapValuesWithResolver is similar to mapValuesWithKey, however, the mapping function that is applied to each subject gets three arguments: the key and metadata of the subject, as well as a Resolver, that determines the metadata for any key that may be referenced by the subject. Using the metadata of the subject and the references, you can retrieve the corresponding payloads.

The Resolver uses one or more ResolutionStrategys to find the metadata that corresponds to a key. One such strategy is DirectQuery, which directly requests the metadata via the Data API -- which is simple but requires one network query for each metadata resolved. In the next section, we present three other resolution strategies that are more efficient by downloading large sets of metadata at once.

In the following snippet, we resolve references from partitions in layer A of catalog inA to partitions in layer B of catalog inA. Each partition in layer A references the name of a partition in layer B. Typically, for example when processing HERE Map Content, the partition's name is stored with other data. However, for the purpose of this example, we assume that there's no other information in partitions in layer A.

First, we query the layer A into subjects DeltaSet. Then we call mapValuesWithResolver on it, passing a mapping function and a strategies parameter, which is set to DirectQuery.

Inside the mapping function:

  1. We retrieve the partition in layer A using the retriever.
  2. We convert the content of the partition to a string, referenceName, and construct a Key object referencing the referenceName partition in layer B.
  3. We use the resolver to retrieve the metadata corresponding to the reference.
  4. If the referenced partition does not exist, if we throw an exception.
  5. Otherwise, we retrieve that partition.
val retriever = context.inRetriever(Catalog.Id("inA"))
val subjects: DeltaSet[Key, Meta] =
  context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("A"))
subjects.mapValuesWithResolver(
  mapFn = {
    case (resolver, key, meta) =>
      // Retrieve and decode the payload. String construction is a placeholder for
      // decoding the partition and getting a reference.
      val referenceName = new String(retriever.getPayload(key, meta).content)
      // Construct a key for the partition with "referenceName" in layer B.
      val referenceKey = Key(Catalog.Id("inA"), Layer.Id("B"), Generic(referenceName))
      // Try to find the metadata for `referenceKey`.
      resolver.resolve(referenceKey) match {
        case None                => throw new Exception("Partition does not exist!")
        case Some(referenceMeta) =>
          // Retrieve the referenced partition.
          val referencePartition = retriever.getPayload(referenceKey, referenceMeta)
          ??? // TODO: Do something with the partition
      }
  },
  strategies = List(DirectQuery(Catalog.Id("inA"), Set(Layer.Id("B"))))
)

Resolution Strategies

We currently provide four resolution strategies:

  1. DirectQuery -- Given a catalog, and a set of layers, this strategy directly retrieves the metadata individually for each key via the Data API. The result of the request is cached per executor. The size of the cache can be configured by passing an argument to the constructor of DirectQuery; the default is 10000 metadata objects (around 3MB).

  2. Broadcast -- Given a DeltaSet containing metadata, this strategy sends a complete copy of the metadata to each Spark executor, making sure that the whole metadata is available on each executor without further network requests. Internally, this strategy uses a Spark broadcast variable. Depending on the amount of metadata, this may require a large amount of memory in each e xecutor. The memory required for storing the metadata is roughly 300 bytes per partition in the DeltaSet.

  3. BackwardResolution -- Suppose you have a DeltaSet containing references and a function that maps each reference to a set of subject partitions. BackwardResolution exposes these references to the resolver when processing the subject partition, without any further network queries. For example, in the following snippet each partition of layer A is grouped with all of its children in layer B.

    BackwardResolution(
    context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), { (key, meta) =>
     Set(key.copy(layer = Layer.Id("A"), partition = key.partition.parent.get))
    }
    )
    

    The predefined backward resolution strategy BackwardResolution.toSamePartition groups each subject partition with the reference partition of the same name. Similarly, BackwardResolution.toNeighbors groups each subject tile with all its neighbor tiles at a given depth.

  4. ForwardResolution -- Suppose you have a DeltaSet containing references and a function that maps each subject to a set of reference partitions. ForwardResolution exposes these references to the resolver when processing the subject partition, without any further network queries. This strategy is the inverse of BackwardResolution. ForwardResolution takes the key and value types of the subject DeltaSet as type parameters. The following snippet shows how to group each subject tile with all neighbor tiles.

    ForwardResolution[Key, Meta](
    context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), {
     case (key @ Key(_, _, tile: HereTile), meta) =>
       tile
         .neighbors(1)
         .map(tile => key.copy(layer = Layer.Id("B"), partition = tile))
    }
    )
    

mapValuesWithResolver takes a list of resolutions strategies, allowing them to be combined sequentially. Consider, for example, the resolution strategy list in the following snippet. When processing partitions of layer A and resolving references to tiles in layer B, all direct neighbors of the currently processed tile will be available without network request. To resolve other tiles in layer B (either farther away or at a different zoom level), a network request will be performed. To resolve references to layer C, we will always use network requests, while references in layer D will be resolved using a broadcast.

val strategies =
  List(
    BackwardResolution.toNeighbors(
      context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")),
      Catalog.Id("inA"),
      Layer.Id("A")
    ),
    DirectQuery(Catalog.Id("inA"), Set(Layer.Id("B"), Layer.Id("C"))),
    Broadcast(context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("D")))
  )

mapValuesWithResolver can be applied to DeltaSet[Key, Meta], in which case the partitioner of the subject DeltaSet will be used also to partition references. If mapValuesWithResolver is applied to a DeltaSet with a different key or value type, a partitioner for the references must be provided. For example, in this snippet, a DeltaSet[HereTile, String] is transformed.

val strategies =
  List(
    BackwardResolution.toNeighbors(
      context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")),
      Catalog.Id("inA"),
      Layer.Id("A")
    ),
    DirectQuery(Catalog.Id("inA"), Set(Layer.Id("B"), Layer.Id("C"))),
    Broadcast(context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("D")))
  )

Statically Resolve References

resolveReferences is a transformation that implements the same functionality as the first step of a RefTreeCompiler: given a reference relation between catalog partitions, the transformation groups each partition in a DeltaSet with its references. The reference relation is defined by:

  • a RefTree, which specifies the types of references that may exist between partitions
  • a resolve function, which computes the concrete set of partition keys referenced by a partition For more information, see RefTreeCompiler.

In the following snippet, resolveReferences is used to group each key-meta pair in deltaSet1, whose partition name is a HERE tile, is grouped with all key-meta pairs of all of its neighbors.

import com.here.platform.data.processing.compiler.reftree._

import scala.collection.JavaConversions._

val deltaSet1: DeltaSet[Key, Meta] = ???
val deltaSet2: DeltaSet[Key, (Meta, Map[Key, Meta])] =
  deltaSet1.resolveReferences(
    RefTree(
      Subject((Catalog.Id("inCatalogA"), Layer.Id("inLayerA")),
              Ref(RefTree.RefName("neighbor"),
                  (Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))))),
    resolveFn = {
      case (Key(catalog, layer, partition: HereTile), meta) =>
        val neighbor =
          partition.quad.getNeighbors(partition.level).toSet
        Map(RefTree.RefName("neighbor") -> neighbor.map(neighborQuad =>
          Key(catalog, layer, HereTile(neighborQuad))))
      case _ => Map.empty
    }
  )

Note: resolveReferences behaves differently than the reference resolution step in a RefTreeCompiler in one detail: If Partition Key Filters are defined in the configuration file, they apply to both references and subjects. In RefTreeCompiler, only the subjects are filtered.

Read Back Published Data

For a PublishedSet, readBack is the only transformation that you can apply. It turns the PublishedSet, which is the result of publishing partitions to a layer, into a DeltaSet containing the key and metadata of all partitions contained in that layer after the publishing. This way, partitions that were published in an earlier processing step can be read back and used in the following steps.

In the code snippet below, we are using readBack to read an intermediate result after it has been published to an output catalog layer. We then combine the result of readBack using disjointUnion with a layer from the input catalog hmc (HERE Map Content) and resolve references between these layers.

val intermediate: DeltaSet[Key, Payload] = ???
val intermediatePublished: PublishedSet = intermediate.publish(Set(Layer.Id("intermediate")))
val intermediateAndTopology: DeltaSet[Key, Meta] =
  intermediatePublished.readBack() disjointUnion
    context.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("topology_geometry"))
val references: DeltaSet[Key, (Meta, Map[Key, Meta])] =
  intermediateAndTopology.resolveReferences(
    RefTree(
      Subject((Default.OutCatalogId, Layer.Id("intermediate")),
              Ref(RefTree.RefName("intermediate_to_topology"),
                  (Catalog.Id("hmc"), Layer.Id("topology_geometry"))))),
    resolveFn = ???
  )

Convert RDDs to DeltaSets

toDeltaSet is an operation that can be used to convert a Spark RDD into a DeltaSet. This can be used, for example, for ingesting data from other sources using Spark and integrating it into a processing pipeline that uses DeltaSets. The RDD must contain key-value pairs and it may not contain more than one pair with the same key. A partitioner to repartition the RDD must be passed to toDeltaSet. Unless the RDD is already partitioned with the given partitioner, the repartitioning of the RDD causes a shuffle.

The resulting DeltaSet does not contain any information about changes since the last run of the pipeline, even in an incremental run of the pipeline, downstream DeltaSet transformation will process all data in the DeltaSet.

In the following snippet, we show to ingest a CSV file via Spark RDDs into a DeltaSet. We are using the SparkContext, which is accessible via the DeltaContext, to read a CSV file airports.csv. Then, we convert the RDD into a key-value form, where the first column of the CSV file serves as a key. Finally, we use toDeltaSet to convert the RDD to a DeltaSet.

val sc = context.driverContext.spark
val rowsByFirstColumn: RDD[(Key, Array[String])] =
  sc.textFile("airports.csv").map { x =>
    val columns = x.split(",").map(_.trim)
    (Key('outCatalog, 'outLayer, Generic(columns(0))), columns.drop(1))
  }
val deltaSet: DeltaSet[Key, Array[String]] =
  rowsByFirstColumn.toDeltaSet(context.defaultPartitioner)

Spark Partitions and Shuffles

The data in a DeltaSet is always partitioned according to a specific partitioner, that assigns each key in the DeltaSet to a Spark partition, and thereby, to a node in the cluster where the data resides. The partitioner is always preserved during DeltaSet transformations unless a new partitioner is explicitly specified in a transformation. In particular, the repartition transformation does nothing but change the partitioner and repartition the data according to the new partitioner.

All transformations that potentially transform the keys in a DeltaSet or change the partitioner require repartitioning the data, and may, therefore, move data between the nodes in the cluster, which is called shuffling data. Shuffling is an expensive operation and should be avoided. See the transformation property table to see which DeltaSet transformations may need to shuffle data.

Partitioning Strategies

Each transformation that shuffles data requires you to explicitly provide a partitioning strategy for the resulting keys. This partitioning strategy can either be a partitioner or the PreservesPartitioning special value.

When you use a partitioner, the transformation uses that partitioner to repartition the result. If there are no performance requirements for your transformation, you can use the defaultPartitioner field in the DeltaContext.

When a transformation changes the keys in a DeltaSet but each key remains in the same Spark partition, you can use PreservesPartitioning. This strategy prevents the transformation from shuffling data altogether, resulting in significant performance improvements. If the transformation cannot preserve the partitioning, an exception is thrown at runtime.

In the copy-a-layer example, we use mapKeys to change the catalog and layer of the keys in a DeltaSet. It is partitioned with the defaultPartitioner, a PartitionNamePartitioner, which groups all catalog partitions with the same name in the same Spark partition, irrespective of the catalog and layer. Consequently, all catalog partitions remain within the same Spark partition and we can use PreservesPartitioning.

The PreservesPartitioning partitioning strategy can even be used when the key types for the upstream and downstream DeltaSets are not the same, as long as the upstream DeltaSet is partitioned by a partitioner that is general enough to also handle the downstream key type. This advanced feature allows, for example, to map a DeltaSet with Partition.Key keys to a DeltaSet with Partition.Name keys without causing a Spark shuffle:

// Query data from a catalog and change the key type from Partition.Key to Partition.Name
val catalog: Catalog.Id = ???
val layer: Layer.Id = ???
context
  .queryCatalogLayer(catalog, layer)
  .mapKeys(
    OneToOne[Key, Name](_.partition, Key(catalog, layer, _)),
    PreservesPartitioning
  )

Here, PreservesPartitioning can be used because the default partitioner used by queryCatalogLayer is a PartitionNamePartitioner that supports both the upstream key type, Partition.Key, and the downstream key type, Partition.Name.

Laziness and Persist Data

Whenever a DeltaSet is used in two or more transformations, its result should be persisted in the memory or on the disk of the Spark workers, to avoid recomputing the result twice or more. This is explained in more detail for RDDs in RDD Persistence policy and applies equally to DeltaSets. Use the transformation persist to persist a DeltaSet for reuse, as in the following example:

import org.apache.spark.storage.StorageLevel

val deltaSet1: DeltaSet[Key, Int] = ???
val doubled = deltaSet1
  .mapValues(_ * 2)
  .persist(StorageLevel.MEMORY_AND_DISK_2)
val reuse1 =
  doubled
    .mapValues(x => Payload(BigInt(x).toByteArray))
    .publish(Set(Layer.Id("outLayer1")))
val reuse2 =
  doubled
    .mapValues(x => Payload(BigInt(x / 2).toByteArray))
    .publish(Set(Layer.Id("outLayer2")))

Performance Properties

The internal implementations of the different transformations have varying cost in time and space, which cannot directly be seen from the outside. In particular, there are two properties that make certain transformations be more expensive than others: shuffling and stateful transformations.

A transformation that shuffles data moves data between nodes in the cluster, which uses bandwidth and causes a slow-down of the computation.

A transformation that is stateful has to store auxiliary information in the state layer of the output catalog to allow incremental computations. This means, that extra storage is consumed in the output catalog, extra time is required to compute the state and additional RAM on the nodes of the cluster are required to persist the state until the end of the computation.

Using stateful and/or shuffling operations is inevitable in most pipelines, however, the following table can help to avoid stateful and shuffling transformations wherever possible.

Operation Shuffles? Stateful?
detectChanges no yes
disjointUnion no no
filterByKey no no
flatMapGroup yes1 yes
flatMapKeys yes1 no
flatMapKeysGroup yes1 no
flatMapKeysReduce yes1 no
flatMapReduce yes1 yes
flatMapUnique yes1 yes
join no yes
leftOuterJoin no no
mapGroup yes1 yes
mapKeys yes1 no
mapKeysGroup yes1 no
mapKeysReduce yes1 no
mapReduce yes1 yes
mapUnique yes1 yes
mapValues no no
mapValuesWithKey no no
mapValuesWithResolver yes yes
outerJoin no no
persist no no
publish no no
readBack no no
repartition yes no
resolveReferences yes yes
toDeltaSet yes no
1. Shuffling can be avoided if PreservesPartitioning is used as a partitioning strategy.

Comparison and Migration

This section compares DeltaSets to the other ways of expressing distributed computation in the Data Processing Library and provides help to migrate to DeltaSets from less flexible interfaces.

Functional Patterns

If you are a previous user of Functional Patterns, you can understand DeltaSets as the way to build custom compilation patterns, which means that you can have compilers with as many resolveFn, compileInFn, and compileOutFn functions as required by your particular application. However, DeltaSets provide many more ways of structuring the computation – for example, you can reuse the result of one compileInFn in several compileOutFns, you can join the results of several compileInFn, you can publish an intermediate results to the output catalog.

Migrate a MapGroupCompiler to DeltaSets

A MapGroupCompiler, expressed in DeltaSet transformations, corresponds to:

  • applying a compileIn function to all key-meta pairs in the inLayers grouping the result using flatMapGroup
  • transforming the groups of intermediate data into payloads using the compileOut function
  • publishing the result

The full example is shown below. TODO tags identify the places where you define your intermediate data, input layers, and the MapGroupCompiler.

import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, MapGroupCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._

object MapGroupMain extends DeltaSimpleSetup {

  case class IntermediateData() // TODO: Define the intermediate data of the compiler

  def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
    ??? // TODO: Define the input layers of the compiler

  def constructMapGroupCompiler(retrievers: Map[Catalog.Id, Retriever])
    : MapGroupCompiler[IntermediateData] with CompileOut1To1Fn[IntermediateData] =
    ??? // TODO: Construct the compiler

  def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {

    import context.transformations._

    val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
    val compiler = constructMapGroupCompiler(retrievers)

    // If the compiler defined a particular partitioner, use it. Otherwise, use same partitioner
    // used for partitioning queried metadata (context.defaultPartitioner), a reasonable default.
    val partitioner =
      compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)

    val result =
      context
        .queryCatalogs(
          inLayers
        )
        .flatMapGroup(
          Function.untupled(compiler.compileInFn),
          partitioner
        )
        .mapValuesWithKey(
          compiler.compileOutFn
        )
        .publish(
          compiler.outLayers
        )

    Iterable(result)
  }
}

Migrate a RefTreeCompiler to DeltaSets

A RefTreeCompiler, expressed in DeltaSet transformations, corresponds to:

  • grouping a set of subject partitions with their references using resolveReferences
  • applying a compileIn function to all subject-reference pairs, and grouping the result using flatMapGroup
  • transforming the groups of intermediate data into payloads using the compileOut function
  • publishing the result

The full example is shown below. TODO tags identify the places where you define your intermediate data, input layers, and the RefTreeCompiler:

import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.reftree.CompileInFnWithRefs
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, RefTreeCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._

object RefTreeMain extends DeltaSimpleSetup {

  case class IntermediateData() // TODO: Define the intermediate data of the compiler

  def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
    ??? // TODO: Define the input layers of the compiler

  def constructTestRefTreeCompiler(
      retrievers: Map[Catalog.Id, Retriever]): RefTreeCompiler[IntermediateData]
    with CompileInFnWithRefs[IntermediateData]
    with CompileOut1To1Fn[IntermediateData] =
    ??? // TODO: Construct the compiler

  def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {

    import context.transformations._

    val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
    val compiler = constructTestRefTreeCompiler(retrievers)

    // If the compiler defined a particular partitioner, use it. Otherwise, use same partitioner
    // used for partitioning queried metadata (context.defaultPartitioner), a reasonable default.
    val partitioner =
      compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)

    Iterable(
      context
        .queryCatalogs(
          compiler.inLayers
        )
        .resolveReferences(
          compiler.refStructure,
          Function.untupled(compiler.resolveFn)
        )
        .flatMapGroup(
          { case (k, (v, refs)) => compiler.compileInFn((k, v), refs) },
          partitioner
        )
        .mapValuesWithKey(
          compiler.compileOutFn
        )
        .publish(
          compiler.outLayers
        )
    )
  }
}

Migrate Direct 1:N and M:N Compilers to DeltaSets

A Direct M:N Compiler, expressed in DeltaSet transformations, corresponds to:

  • computing the intermediate data for each input key-value pair using compileIn and mapValuesWithKey.
  • mapping all input key-values pairs to the corresponding output keys using mappingFn and flatMapKeysGroup. Here, additionally the inverse of mappingFn has to be specified.
  • transforming the groups of intermediate data into payloads using the compileOut function
  • publishing the result

The full example is shown below. TODO tags identify the places where you define your intermediate data, input layers, the inverse of mappingFn and the DirectMToNCompiler:

import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.direct.CompileInFn
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, DirectMToNCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._

object DirectMToNMain extends DeltaSimpleSetup {

  case class IntermediateData() // TODO: Define the intermediate data of the compiler

  def inverseMappingFn: Partition.Key => Iterable[Partition.Key] = ???

  def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
    ??? // TODO: Define the input layers of the compiler

  def constructTestDirectMToNCompiler(retrievers: Map[Catalog.Id, Retriever]): DirectMToNCompiler[
    IntermediateData] with CompileInFn[IntermediateData] with CompileOut1To1Fn[IntermediateData] =
    ??? // TODO: Construct the compiler

  def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {

    import context.transformations._

    val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
    val compiler = constructTestDirectMToNCompiler(retrievers)

    // If the compiler defined a particular partitioner, use it. Otherwise, use same partitioner
    // used for partitioning queried metadata (context.defaultPartitioner), a reasonable default.
    val partitioner =
      compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)

    Iterable(
      context
        .queryCatalogs(
          compiler.inLayers
        )
        .mapValuesWithKey {
          case (k, v) =>
            compiler.compileInFn((k, v))
        }
        .flatMapKeysGroup(
          ManyToMany(
            compiler.mappingFn,
            inverseMappingFn
          ),
          partitioner
        )
        .mapValuesWithKey(
          compiler.compileOutFn
        )
        .publish(
          compiler.outLayers
        )
    )
  }
}

A Direct1toNCompiler can be expressed using a similar sequence of transformations, replacing flatMapKeysGroup by flatMapKeys.

Spark RDD-based Patterns

Like an RDD, a DeltaSet represents data distributed over a cluster of machines, that can be transformed using a set of functional operations like filterKeys or mapValues. In fact, a DeltaSet internally uses an RDD to represent that data. However, there are three main differences between using DeltaSets and using RDDs directly:

  1. Key-Value: In contrast to an RDD, a DeltaSet contains only key-value pairs and never contains duplicate keys. That is why, for example, there is no simple map operation available for DeltaSets, as it may create duplicate keys.

  2. Strongly partitioned: Furthermore, a DeltaSet is always partitioned according to a specific partitioner, that assigns each key-value pair one Spark partition, which in turn is stored on a specific machine in the cluster. An RDD, on the other hand, can store data without a specific partitioner being defined for how the data is partitioned.

  3. Incremental: Every computation expressed with a DeltaSet can be executed incrementally without manually keeping track of dependencies as done in the DepCompiler and IncrementalDepCompiler.

Multi-compiler Tasks

A multi-compiler task can chain the effect of several compilers by uploading the result of one compiler to an catalog layer and downloading the content of that catalog layer in another compiler. DeltaSets can also chain the effect of several compilers, however, you can decide whether intermediate results should be published and read back from the output catalog, as done in a multi-compiler task, or not. For more information on how to emulate a multi-compiler task using DeltaSets see the readBack transformation.

IDs and Configuration

Sometimes it is useful to change the behavior of a DeltaSet transformation from a configuration file, for example, to tune the performance parameters of transformations without recompiling the application. Each transformation on a DeltaSet has a unique ID, used to identify every transformation in the configuration file. To configure a transformation from the configuration file, assign an ID to it by calling the withId function, which sets the ID of the transformation immediately preceding it.

val deltaSet1: DeltaSet[Key, Int] = ???
val doubled: DeltaSet[Key, Int] = deltaSet1.mapValues(_ * 2).withId("doublingMap")

Transformations that you do not assign an ID in this way are automatically assigned an ID based on the order in which they appear in the source code. To determine the ID, call the id method on the DeltaSet that results from the transformation.

IDs are not only assigned to DeltaSets, but also to PublishedSets. Both share the functionality for identifying and configuring the classes in the common super-class BaseSet.

For establishing modularity within the pipeline, it can be useful to wrap a group of BaseSets into a common namespace. IDs then only need to be unique within this namespace, and the namespace is included in logging messages and the named RDDs in Spark UI. Use BaseSet.Namespace.enter to enter a new namespace. Namespaces can be nested.

val deltaSet1: DeltaSet[Key, Int] = ???
BaseSet.Namespace.enter("routingModule") {
  val doubled: DeltaSet[Key, Int] = deltaSet1
    .mapValues(_ * 2)
    .withId("doublingMap")
}

DeltaSets can be configured by adding a section to the pipeline's application.conf which is described in Configuring the Library. You can configure both the defaults that apply to all transformations and each transformation individually. The snippet below shows the default configuration and describes each option that is defined in the here.platform.data-processing.deltasets.default. To change the default configuration, copy this snippet into your application.conf and change the values accordingly.

// Configures the default settings for DeltaSet operations.
here.platform.data-processing.deltasets.default {

  // Where to store intermediate results that must be persisted. See
  // https://spark.apache.org/docs/latest/rdd-programming-guide.html#which-storage-level-to-choose
  // Applies to: flatMapReduce, flatMapGroup, mapReduce, mapGroup, mapValues, mapValuesWithKey,
  //             resolve, detectChanges, join.
  intermediateStorageLevel = "MEMORY_AND_DISK"

  // Defines how much extra processing should be done to detect invalid use of DeltaSets and
  // improve debugging.
  //
  // Possible values:
  //
  //   PERFORMANCE: Disables certain potentially expensive checks that help detecting incorrect
  //     uses of DeltaSets as early as possible. When this setting is used it can be harder to find
  //     root causes of a problem. In particular:
  //     - disjointUnion does not validate that the union does not create duplicate keys.
  //     - toDeltaSet does not validate that the given RDD does not contain duplicate keys.
  //
  //   SAFETY: Enable validation checks. Default.
  //
  //   DEBUG: Like "SAFETY", but also enable processing to improve debug output. In particular:
  //     - Logical errors such as IllegalArgumentException, are not thrown immediately during a
  //       stage that invokes user-defined functions. Instead, they are collected in a container in
  //       the driver, and thrown only once.
  //       Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
  //                   mapGroup/flatMapGroup, mapValues/mapValuesWithKey and resolve.
  validationLevel = "SAFETY"

  // Number of threads to use for running user-provided functions, for example mapFn and resolveFn.
  // Increasing this number is useful when you have blocking I/O happening inside the function.
  // Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
  //             mapGroup/flatMapGroup, mapValues/mapValuesWithKey and resolve.
  threads = 1

  // Processes the keys within a partition in sorted order. Sorting ensures that keys with
  // similar names are processed together. This can improve the performance if the map function
  // caches resources, such as downloaded payloads, and the same cache entries are likely to be
  // requested while processing keys that are nearby in the sorting order.
  // Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
  //             mapGroup/flatMapGroup and mapValues/mapValuesWithKey.
  sorting = false

  // Can be used to disable incremental computation of a DeltaSet. This will cause upstream
  // and downstream DeltaSets to also be computed non-incrementally.
  // Applies to: all transformations except publish
  incremental = true

}

You can configure a transformation with the ID id in the here.platform.data-processing.deltasets.id path of the configuration. For example, the following snippet sets the number of threads that are used by the doublingMap transformation.

here.platform.data-processing.deltasets.doublingMap {
  threads = 3
}

Configuration files adhere to the namespaces of the transformation. For example, here.platform.data-processing.deltasets.namespace1 can be used to configure all transformations created in the namespace namespace1. If a transformation with ID id1 is created inside namespace1, then its configuration is constructed by applying the settings defined in here.platform.data-processing.deltasets.default, here.platform.data-processing.deltasets.namespace1 and here.platform.data-processing.deltasets.namespace1.id1 in this order.

The configuration read from the configuration file can also be overridden programmatically in the application code. Programmatic overrides have preference over all settings defined in configuration files.

val deltaSet1: DeltaSet[Key, Int] = ???
val doubled: DeltaSet[Key, Int] = deltaSet1
  .mapValues(_ * 2)
  .withConfigOverride(
    c => c.withIncremental(false)
  )

results matching ""

    No results matching ""