Publish Data

The image below illustrates the data publication model. You can mix all layer types in the same publication.

dataservice-view
Figure 1. dataservice-view

The HERE platform supports following types of layers. Please see the dedicated chapters for each of it.

  • versioned
  • volatile
  • index
  • stream

Publish to a Versioned layer

Simplified Publication Process

Just as with the simplified metadata publication for a versioned layer, you can publish both data and metadata with a single step. The snippet below automatically starts a batch publication, publishes the data and metadata using that batch publication, and finalizes the batch. The call finishes once the data has been processed and is available in the catalog.

Scala
Java
// create writeEngine for a catalog
val writeEngine = DataEngine().writeEngine(catalogHrn)

// list of dependencies for this publication
val dependencies = Seq.empty[VersionDependency]

val partitions: Source[PendingPartition, NotUsed] =
  Source(
    List(
      NewPartition(
        partition = newPartitionId1,
        layer = versionedLayerId,
        data = NewPartition.ByteArrayData(blobData)
      ),
      DeletedPartition(
        partition = deletedPartitionId,
        layer = versionedLayerId
      )
    )
  )

writeEngine.publishBatch2(parallelism = 10,
                          layerIds = Some(Seq(versionedLayerId)),
                          dependencies = dependencies,
                          partitions = partitions)
// create writeEngine for source catalog
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

// parallelism defines how many parallel requests would be made to fetch the data
int parallelism = 10;

// list of dependencies for this publication
List<VersionDependency> dependencies = Collections.emptyList();

NewPartition newPartition =
    new NewPartition.Builder()
        .withPartition(partitionId)
        .withData(blobData)
        .withLayer(layer)
        .build();
DeletedPartition deletedPartition =
    new DeletedPartition.Builder().withPartition(deletedPartitionId).withLayer(layer).build();

ArrayList<PendingPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition);
partitionList.add(deletedPartition);

Source<PendingPartition, NotUsed> partitions = Source.from(partitionList);

CompletableFuture<Done> futurePublish =
    writeEngine
        .publishBatch2(parallelism, Optional.of(Arrays.asList(layer)), dependencies, partitions)
        .toCompletableFuture();

Distributed Publications

The HERE platform allows you to process and publish a large number of partitions in a distributed manner.

For versioned layers, this is a three-step process:

  • start the publication process to initiate a new batch publication and during which you receive a batch token, normally this operation happens on the master or a driver node in the cluster.
  • different workers upload data/metadata, attaching them to same batch token
  • once all data is sent to server, you needs to finalize the batch publication upload, normally this operation happens on the master or a driver node in the cluster.

Upon receiving the complete batch request, the server starts processing the publications to create the next catalog version.

The snippet below illustrates how to publish multiple requests to a versioned layer:

Scala
Java
// create publishApi and writeEngine for source catalog
val publishApi = DataClient().publishApi(catalogHrn, settings)
val writeEngine = DataEngine().writeEngine(catalogHrn)

// start batch publication
publishApi
  .startBatch2(None, Some(Seq(versionedLayerId)), Seq.empty)
  .flatMap { batchToken =>
    //start worker 1 with upload data and publishing metadata
    val worker1 =
      publishApi.publishToBatch(batchToken, partitions1.mapAsync(parallelism = 2) {
        partition =>
          writeEngine.put(partition)
      })

    //start worker 2 with upload data and publishing metadata
    val worker2 =
      publishApi.publishToBatch(batchToken, partitions2.mapAsync(parallelism = 2) {
        partition =>
          writeEngine.put(partition)
      })

    // wait until workers are done uploading data/metadata
    for {
      _ <- worker1
      _ <- worker2
    } yield batchToken

  }
  .flatMap { batchToken =>
    //signal to server complete batch publication
    publishApi.completeBatch(batchToken)
  }
// create publishApi and writeEngine for source catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

// start a batch, publish partitions, complete batch
CompletableFuture<Done> futurePublish =
    publishApi
        .startBatch2(baseVersion, Optional.empty(), Collections.emptyList())
        .thenCompose(
            batchToken -> {
              // start worker 1 with upload data and publishing metadata
              Source<PendingPartition, NotUsed> partitionsOnWorker1 =
                  arbitraryPendingPartitions1;
              CompletableFuture<Done> worker1 =
                  publishApi
                      .publishToBatch(
                          batchToken,
                          partitionsOnWorker1.mapAsync(
                              2, partition -> writeEngine.put(partition)))
                      .toCompletableFuture();

              // start worker 2 with upload data and publishing metadata
              Source<PendingPartition, NotUsed> partitionsOnWorker2 =
                  arbitraryPendingPartitions2;
              CompletableFuture<Done> worker2 =
                  publishApi
                      .publishToBatch(
                          batchToken,
                          partitionsOnWorker2.mapAsync(
                              2, partition -> writeEngine.put(partition)))
                      .toCompletableFuture();

              // wait until workers are done upload
              return worker1.thenCombine(worker2, (done, done2) -> batchToken);
            })
        .thenCompose(
            batchToken -> {
              return publishApi.completeBatch(batchToken);
            })
        .toCompletableFuture();

Publish to a Stream Layer

Data published to a stream layer is not versioned. It becomes immediately available to consumers for processing. The data can be retrieved by subscribing to the stream layer.

The snippet below illustrates how to publish to a stream layer.

Scala
Java
// create writeEngine and queryApi for a catalog
val writeEngine = DataEngine().writeEngine(catalogHrn)
val queryApi = DataClient().queryApi(catalogHrn)

// subscribe to receive new publications from stream layer
queryApi.subscribe("stream-layer",
                   ConsumerSettings("test-consumer"),
                   partition => println("Received " + new String(partition.partition)))

val partitions =
  Source.single(
    NewPartition(
      partition = newPartitionId1,
      layer = streamingLayerId,
      data = NewPartition.ByteArrayData(blobData),
      dataSize = dataSize,
      checksum = checksum,
      compressedDataSize = compressedDataSize
    )
  )

writeEngine.publish(partitions)
// create writeEngine and queryApi for a catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

// subscribe to receive new publications from stream layer
queryApi.subscribe(
    "stream-layer",
    new ConsumerSettings.Builder().withGroupName("test-consumer").build(),
    partition -> processPartition(partition));

NewPartition newPartition =
    new NewPartition.Builder()
        .withPartition(partitionId)
        .withData(blobData)
        .withLayer(layer)
        .withDataSize(dataSize)
        .withCompressedDataSize(compressedDataSize)
        .withChecksum(checksum)
        .build();

Source<PendingPartition, NotUsed> partitions = Source.single(newPartition);

writeEngine.publish(partitions);

Publish to a Volatile Layer

A volatile layer is a key/value store where values for a given key can change and only the latest value is retrievable. As new data is published, old data is overwritten. You must publish a new version of the metadata if there are going to be breaking changes on the data that consumers are expecting to read in the layer.

If you need to publish a new version to a volatile layer, use version dependencies to upload the partitions using the batch publication.

The snippet below illustrates how to use version dependencies.

Scala
Java
// get base version to commit a new version
val publishApi = DataClient().publishApi(catalogHrn)

publishApi.getBaseVersion().flatMap { baseVersion =>
  // compute next version to be used in Md5BlobIdGenerator
  val nextVersion =
    baseVersion
      .map(_ + 1L)
      .getOrElse(0L)

  // create writeEngine for a catalog with a deterministic BlobIdGenerator
  val writeEngine =
    DataEngine().writeEngine(catalogHrn, new StableBlobIdGenerator(nextVersion))

  // list of dependencies for this publication
  val dependencies = Seq.empty[VersionDependency]

  // given a list partitions to commit
  val partitions: Source[PendingPartition, NotUsed] =
    Source(
      List(
        NewPartition(
          partition = newPartitionId1,
          layer = volatileLayerId,
          data = maybeEmptyData
        ),
        NewPartition(
          partition = newPartitionId2,
          layer = volatileLayerId,
          data = maybeEmptyData
        )
      )
    )

  // upload data
  val commitPartitions: Source[CommitPartition, NotUsed] =
    partitions.mapAsync(parallelism = 10) { pendingPartition =>
      writeEngine.put(pendingPartition)
    }

  // publish version to metadata
  publishApi
    .publishBatch2(baseVersion, Some(Seq(volatileLayerId)), dependencies, commitPartitions)
}
// get base version to commit a new version
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

publishApi
    .getBaseVersion()
    .thenCompose(
        baseVersion -> {
          // compute next version to be used in Md5BlobIdGenerator
          Long nextVersion = baseVersion.isPresent() ? baseVersion.getAsLong() + 1 : 0;

          // create writeEngine for a catalog with a deterministic BlobIdGenerator
          BlobIdGenerator idGenerator =
              new StableBlobIdGenerator.Builder().withVersion(nextVersion).build();

          WriteEngine writeEngine =
              DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);

          // list of dependencies for this publication
          List<VersionDependency> dependencies = Collections.emptyList();

          NewPartition newPartition1 =
              new NewPartition.Builder()
                  .withPartition(partitionId1)
                  .withLayer(layer)
                  .withData(maybeEmptyData)
                  .build();

          NewPartition newPartition2 =
              new NewPartition.Builder()
                  .withPartition(partitionId2)
                  .withLayer(layer)
                  .withData(maybeEmptyData)
                  .build();

          ArrayList<PendingPartition> partitionList = new ArrayList<>();
          partitionList.add(newPartition1);
          partitionList.add(newPartition2);

          Source<PendingPartition, NotUsed> partitions = Source.from(partitionList);

          int parallelism = 10;

          // upload data
          Source<CommitPartition, NotUsed> commitPartitions =
              partitions.mapAsync(parallelism, writeEngine::put);

          // publish version to metadata
          CompletionStage<Done> done =
              publishApi.publishBatch2(
                  baseVersion,
                  Optional.of(Arrays.asList(layer)),
                  dependencies,
                  commitPartitions);
          return done;
        });

If you only need to update data in a volatile layer, use the generic publish method.

The snippet below illustrates how to use publish.

Scala
Java
// create queryApi for a catalog to find latest version
val queryApi = DataClient().queryApi(catalogHrn)

queryApi.getLatestVersion().flatMap { maybeLatestVersion =>
  val latestVersion =
    maybeLatestVersion.getOrElse(throw new IllegalArgumentException("No version found!"))

  // create writeEngine for a catalog with a deterministic BlobIdGenerator
  val writeEngine =
    DataEngine().writeEngine(catalogHrn, new StableBlobIdGenerator(latestVersion))

  val partitions: Source[PendingPartition, NotUsed] =
    Source(
      List(
        NewPartition(
          partition = newPartitionId1,
          layer = volatileLayerId,
          data = someData
        ),
        NewPartition(
          partition = newPartitionId2,
          layer = volatileLayerId,
          data = someData
        )
      )
    )

  // publish data without batch token
  partitions
    .mapAsync(parallelism = 10) { partition =>
      writeEngine.put(partition)
    }
    .runWith(Sink.ignore)
}
// create queryApi for a catalog to find latest version
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

CompletionStage<Done> completionStage =
    queryApi
        .getLatestVersion(OptionalLong.empty())
        .thenCompose(
            maybeLatestVersion -> {
              if (!maybeLatestVersion.isPresent())
                throw new IllegalArgumentException("No version found!");

              Long latestVersion = maybeLatestVersion.getAsLong();

              // create writeEngine for a catalog with a deterministic BlobIdGenerator
              BlobIdGenerator idGenerator =
                  new StableBlobIdGenerator.Builder().withVersion(latestVersion).build();

              WriteEngine writeEngine =
                  DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);

              NewPartition newPartition1 =
                  new NewPartition.Builder()
                      .withPartition(partitionId1)
                      .withLayer(layer)
                      .withData(someData)
                      .build();

              NewPartition newPartition2 =
                  new NewPartition.Builder()
                      .withPartition(partitionId2)
                      .withLayer(layer)
                      .withData(someData)
                      .build();

              ArrayList<PendingPartition> partitionList = new ArrayList<>();
              partitionList.add(newPartition1);
              partitionList.add(newPartition2);

              Source<PendingPartition, NotUsed> partitions = Source.from(partitionList);

              int parallelism = 10;

              // publish data without batch token
              CompletionStage<Done> done =
                  partitions
                      .mapAsync(parallelism, writeEngine::put)
                      .runWith(Sink.ignore(), myMaterializer);

              return done;
            });

Delete from a Volatile Layer

When you need to delete the metadata and data from a volatile layer, you can use the following two step process.

  1. First, you delete the data using DataEngine.writeEngine with DeletePartition object. A DeletePartition is similar to NewPartition but contains the dataHandle instead of the payload. This was the referenced blob object (the data) is deleted.

  2. Second, you delete the metadata by using PublishApi.publishBatch with the CommitPartition object you got from the writeEngine API call before.

If you skip the second step you get the same result as if the volatile partition is expired; the data is gone but the metadata is still there.

The snippet below illustrates how to delete data and metadata fro volatile layer.

Scala
Java
// get base version to commit a new version
val publishApi = DataClient().publishApi(catalogHrn)

publishApi.getBaseVersion().flatMap { baseVersion =>
  // compute next version to be used in Md5BlobIdGenerator
  val nextVersion =
    baseVersion
      .map(_ + 1L)
      .getOrElse(0L)

  // create writeEngine for a catalog with a deterministic BlobIdGenerator
  val writeEngine =
    DataEngine().writeEngine(catalogHrn, new StableBlobIdGenerator(nextVersion))

  // list of dependencies for this publication
  val dependencies = Seq.empty[VersionDependency]

  val queryApi = DataClient().queryApi(catalogHrn)
  val filter = VolatilePartitionsFilter.byIds(Set(deletePartitionId1, deletePartitionId2))
  val partitions: Seq[Partition] = Await
    .result(queryApi.getVolatilePartitionsAsIterator(volatileLayerId, filter), Duration.Inf)
    .toSeq

  // prepare list of partitions to be deleted
  val deletePartitions: Source[PendingPartition, NotUsed] =
    Source(
      partitions.map {
        case referencePartition: ReferencePartition =>
          val dataHandle = referencePartition.getDataHandle
          val partitionId = referencePartition.partition
          DeletedPartition(
            partition = partitionId,
            layer = volatileLayerId,
            dataHandle = Some(dataHandle)
          )
      }.toList
    )

  // delete data
  val commitPartitions: Source[CommitPartition, NotUsed] =
    deletePartitions.mapAsync(parallelism = 10) { pendingPartition =>
      writeEngine.put(pendingPartition)
    }

  // publish version to metadata
  publishApi
    .publishBatch2(baseVersion, Some(Seq(volatileLayerId)), Seq.empty, commitPartitions)
}
// get the partitions from partitionIds
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
VolatilePartitionsFilter filter =
    new VolatilePartitionsFilter.Builder()
        .withIds(new HashSet<String>(Arrays.asList(partitionId1, partitionId2)))
        .build();

final List<Partition> partitions = new ArrayList<Partition>();
try {
  queryApi
      .getVolatilePartitionsAsIterator(layerId, filter, Collections.emptySet())
      .toCompletableFuture()
      .get()
      .forEachRemaining(partitions::add);
} catch (Exception exp) {
  partitions.clear();
}

// get base version to commit a new version
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

publishApi
    .getBaseVersion()
    .thenCompose(
        baseVersion -> {
          // compute next version to be used in Md5BlobIdGenerator
          Long nextVersion = baseVersion.isPresent() ? baseVersion.getAsLong() + 1 : 0;

          // create writeEngine for a catalog with a deterministic BlobIdGenerator
          BlobIdGenerator idGenerator =
              new StableBlobIdGenerator.Builder().withVersion(nextVersion).build();

          WriteEngine writeEngine =
              DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);

          ArrayList<PendingPartition> partitionList = new ArrayList<>();
          for (Partition p : partitions) {
            if (p instanceof ReferencePartition) {
              ReferencePartition referencePartition = (ReferencePartition) p;
              partitionList.add(
                  new DeletedPartition.Builder()
                      .withLayer(layerId)
                      .withPartition(referencePartition.getPartition())
                      .withDataHandle(referencePartition.getDataHandle())
                      .build());
            }
          }

          Source<PendingPartition, NotUsed> pendingPartitions = Source.from(partitionList);

          int parallelism = 10;

          // upload data
          Source<CommitPartition, NotUsed> commitPartitions =
              pendingPartitions.mapAsync(parallelism, writeEngine::put);

          // publish version to metadata
          CompletionStage<Done> done =
              publishApi.publishBatch2(
                  baseVersion,
                  Optional.of(Arrays.asList(layerId)),
                  Collections.emptyList(),
                  commitPartitions);
          return done;
        });

Note: BlobIdGenerator

It is recommended to use the StableBlobIdGenerator to create the write engine for uploading volatile partitions. If you define you own BlobIdGenerator ensure that the method generateVolatileBlobId(partition) is stable i.e. for a certain partition it generates same blobId on each call. By default generateVolatileBlobId returns the result of generateBlobId. So if this method is stable it should be fine otherwise it must be overridden, as having a non stable blobIds for volatile partitions can create orphaned blobs.

Custom blobIdGenerator example bellow:

Scala
Java
class CustomBlobIdGenerator extends BlobIdGenerator {

  override def generateBlobId(partition: NewPartition): String =
    UUID.randomUUID.toString

  override def generateVolatileBlobId(partition: NewPartition): String =
    "volatile-partition-" + partition.partition

}
public class JavaCustomBlobIdGenerator implements BlobIdGenerator {
  @Override
  public String generateBlobId(NewPartition partition) {
    return UUID.randomUUID().toString();
  }

  @Override
  public String generateVolatileBlobId(NewPartition partition) {
    return "volatile-partition-" + partition.partition();
  }
}

Publish to an Index Layer

Data published to an index layer is not versioned but is indexed. To publish and index the data, you have two options:

  • You can separately publish and index the data by calling the methods WriteEngine.put and PublishApi.index.
  • Or you can call the method WriteEngine.uploadAndIndex that both publishes and indexes the data of a partition.

The snippet below illustrates how to create a new partition that can later be published to an index layer.

Scala
Java
// How to define NewPartition for Index layer
val newPartition = NewPartition(
  partition = "",
  layer = indexLayerId,
  data = ByteArrayData(bytes),
  fields = Some(
    Map(
      "someIntKey" -> IntIndexValue(42),
      "someStringKey" -> StringIndexValue("abc"),
      "someBooleanKey" -> BooleanIndexValue(true),
      "someTimeWindowKey" -> TimeWindowIndexValue(123456789L),
      "someHereTileKey" -> HereTileIndexValue(91956L)
    )),
  metadata = Some(
    Map(
      "someKey1" -> "someValue1",
      "someKey2" -> "someValue2"
    )),
  checksum = Some(checksum),
  crc = Some(crc),
  dataSize = Some(dataSize)
)
// How to define NewPartition for Index layer
NewPartition newPartition =
    new NewPartition.Builder()
        .withPartition("")
        .withLayer(indexLayerId)
        .withData(bytes)
        .addIntField("someIntKey", 42)
        .addStringField("someStringKey", "abc")
        .addBooleanField("someBooleanKey", true)
        .addTimeWindowField("someTimeWindowKey", 123456789L)
        .addHereTileField("someHereTileKey", 91956L)
        .addMetadata("someKey1", "someValue1")
        .addMetadata("someKey2", "someValue2")
        .withChecksum(Optional.of(checksum))
        .withDataSize(OptionalLong.of(dataSize))
        .build();

Note

The Metadata parameter is an additional key-value collection that is not related to any index key. A Metadata key is a user-defined field that can store extra information about a record such as the ingestion time: Map("ingestionTime" -> "1532018660873").

The NewPartition.fields class member is also called index attributes on the portal or indexDefinitions in the OLP CLI.

The snippet below illustrates how to upload data and to index partition with single method WriteEngine.uploadAndIndex.

Scala
Java
// The example illustrated how to upload data and to index partition
// with single method WriteEngine.uploadAndIndex
writeEngine.uploadAndIndex(Iterator(newPartition))
// The example illustrated how to upload data and to index partition
// with single method WriteEngine.uploadAndIndex
Iterator<NewPartition> partitions = Arrays.asList(newPartition).iterator();
CompletionStage<Done> publish = writeEngine.uploadAndIndex(partitions);

The snippet below illustrates how to upload data with WriteEngine.put and then to index partition with PublishApi.publishIndex

Scala
Java
// How to upload data with WriteEngine.put and
// index the partition with PublishApi.publishIndex
val putAndIndex: Future[Done] =
  for {
    commitPartition <- writeEngine.put(newPartition)
    _ <- publishApi.publishIndex(indexLayerId, Iterator(commitPartition))
  } yield Done
// How to upload data with WriteEngine.put and
// index the partition with PublishApi.publishIndex
CompletionStage<Done> putAndIndex =
    writeEngine
        .put(newPartition)
        .thenCompose(
            commitPartition -> {
              Iterator<CommitPartition> commitPartitions =
                  Arrays.asList(commitPartition).iterator();
              return publishApi.publishIndex(indexLayerId, commitPartitions);
            });

Update an Index Layer

When you need to change the data in an index layer, you can use the PublishApi.updateIndex API call. The method takes 3 arguments:

  • layer - The layer id of the layer which should be updated.
  • additions - A list of partitions to add. Note that you must first upload the data blob using WriteEngine.put before you can add the corresponding partition.
  • deletions - A list of partitions to delete.

The snippet bellow demonstrates the usage of the PublishApi.updateIndex API:

Scala
Java
val updateIndex: Future[Done] = {
  // partitions to add
  // see above how to define a new partition for an index layer
  val additions = Seq(newPartition)
  // partitions to remove
  // use CommitPartition.deletedIndexPartition to define a partition its data handle that
  // you plan to remove
  val removals = Seq(CommitPartition.deletedIndexPartition(dataHandle, indexLayerId))

  for {
    // first you have to upload corresponding blobs of the new partitions to the Blob Store
    committedAdditions <- Future.sequence(additions.map(p => writeEngine.put(p)))
    _ <- publishApi.updateIndex(indexLayerId,
                                committedAdditions.toIterator,
                                removals.toIterator)
  } yield Done
}
CompletionStage<Done> updateIndex =
    writeEngine
        // first you have to upload corresponding blobs of the new partitions
        // to the Blob Store
        .put(newPartition)
        .thenCompose(
            commitPartition -> {
              Iterator<CommitPartition> additions = Arrays.asList(commitPartition).iterator();

              // use DeleteIndexPartitionBuilder to define partitions that you plan to remove
              CommitPartition deletePartition =
                  new CommitPartition.Builder()
                      .deleteIndexPartition()
                      .withLayer(indexLayerId)
                      .withDataHandle(dataHandle)
                      .build();

              Iterator<CommitPartition> removals = Arrays.asList(deletePartition).iterator();

              return publishApi.updateIndex(indexLayerId, additions, removals);
            });

Delete from an Index Layer

When you need to delete the metadata and data in an index layer, you can use the PublishApi.deleteIndex API call. The delete operation will be scheduled when the PublishApi.deleteIndex API call is successful.

The method takes 2 arguments:

  • layer - The layer id of the layer from that some records should be deleted.
  • queryString - A string written in the RSQL query language to query the index layer.

The method returns:

  • deleteId - A string which can later be used to query the delete status.

For checking delete status, you can use QueryApi.queryIndexDeleteStatus API call.

The method takes one argument:

  • deleteId - The delete request id returned from PublishApi.deleteIndex API call.

The method returns:

  • DeleteIndexesStatusResponse - The response will provide information on the state and number of partitions deleted at the time of delete status request.

The snippet below demonstrates the usage of the PublishApi.deleteIndex API and the QueryApi.queryIndexDeleteStatus API:

Scala
Java
import scala.concurrent.Await
import scala.concurrent.duration._

val queryString = "someIntKey>42;someStringKey!=abc"
val deleteId =
  Await.result(publishApi.deleteIndex(indexLayerId, queryString), 45.seconds)

// Note that the delete operation for deleting records in index layer is an async operation
// This example will return the current status of the delete request
// If user wants to wait for the delete status to be in Succeeded state, user may have to
// perform multiple delete status calls
// It is recommended to use exponential backoff policy to reduce the rate of delete status
// calls to the server
val deleteStatusResponse =
  Await.result(queryApi.queryIndexDeleteStatus(indexLayerId, deleteId), 5.seconds)
println("Current state of index delete request is " + deleteStatusResponse.state)
String queryString = "someIntKey>42;someStringKey!=abc";
String deleteId =
    publishApi.deleteIndex(indexLayerId, queryString).toCompletableFuture().join();

// Note that the delete operation for deleting records in index layer is an async operation
// This example will return the current status of the delete request
// If user wants to wait for the delete status to be in Succeeded state, user may have to
// perform multiple delete status calls
// It is recommended to use exponential backoff policy to reduce the rate of delete status
// calls to the server
DeleteIndexesStatusResponse deleteStatusResponse =
    queryApi.queryIndexDeleteStatus(indexLayerId, deleteId).toCompletableFuture().join();
System.out.println(
    "Current state of index delete request is " + deleteStatusResponse.state());

results matching ""

    No results matching ""