Troubleshooting

Contents

Pipelines

Spark


Troubleshooting Pipelines

Q: How do I investigate a pipeline that fails before a logging URL is created?

A: Log in to the HERE platform portal and click Tools then Monitoring and Alerts. Search the Splunk logs using the text "request_id". If there are multiple logs, filter them by the approximate time of the run. This scenario will be better handled in a future release.

Q: Why do I see some pipelines in the CLI but not in the platform portal, or in the platform portal but not in the CLI?

A: Pipelines are only visible to the group that was specified when the pipeline was created. The CLI client uses client credentials whereas the platform portal uses user credentials. The client credentials and user credentials must have privileges to access the same group.

Q: Why is my pipeline throwing a [DatastreamSource] fetchMessages request failed with invalid offset error?

A: There are two known causes of this error:

  • The pipeline was paused for longer than the retention period of one of the input streams.
  • The pipeline is processing data more slowly than the input streams are receiving data. The data being processed by the pipeline will eventually get dropped by the stream layer as it crosses the retention period threshold.

Q: What does it mean when I have a master URL exception when trying to run a Data Processing pipeline locally?

For example:

A master URL must be set in your configuration at org.apache.spark.SparkContext.<init>(SparkContext.scala:379) at com.here.platform.data.processing.driver.DriverRunner$class.com$here$platform$data$processing$driver$DriverRunner$$newSparkContext
...

A: The error you’re experiencing is a simple omission in the execution arguments on your maven build.

Please add the following to your maven command line: -Dexec.args=--master local[*]

For example:

mvn exec:java -Dexec.cleanupDaemonThreads=false
-Dexec.mainClass=com.here.platform.examples.location.batch.Main
"-Dexec.args=--master local[*]"
-Dpipeline-config.file=config/pipeline-config.conf
-Dpipeline-job.file=config/pipeline-job.conf

Q: When creating Pipeline Versions, I sometimes experience JsonParsingException errors. What can I do?

Cause: This is an intermittent error that can occur with pipeline message calls.

Solution: This problem has been difficult to reproduce and isolate. But despite the error message, the command works as intended, even though the response with the Pipeline Version ID is lost. If you see this error, you can verify that your Pipeline Version has been created successfully and get its Pipeline Version ID by using the following CLI command:

pipeline.py pipeline-version list <pipeline-id>

Q: How can I include credentials in my pipeline JAR file?

A: Adding credentials in the Pipeline JAR file is highly discouraged for security reasons. The platform manages the credentials of the pipeline on behalf of the user. To understand more about setting group and permissions, see the Teams and Permissions User Guide.

Q: How do I restrict access to my pipeline?

A: This can be achieved via 'Groups'. The Pipeline API supports specifying a group while creating a pipeline. Users belonging to that group can access the pipeline whereas users outside of that group will not be able to access the pipeline. To understand more about setting groups for your account, see the Teams and Permissions User Guide.

Q: Why does my pipeline JAR file fail to deploy?

A: The most common reasons for a pipeline failing to deploy include one or more of the following:

  • You do not have the 'rights' or 'credentials' necessary to deploy a pipeline.
  • Your pipeline JAR file exceeds 500MB in file size, so it is too big to deploy.
  • Your pipeline JAR file has a filename exceeding the maximum of 200 characters and cannot be processed.
  • The pipeline is unavailable or has committed all available resources. Consult your system administrator to fix the situation.
  • If the POST transaction cannot be completed within 50 minutes, the connection will be closed by the remote host and return an error.

Q: How do I find events in the log that came just before a failing pipeline error?

A: The log files can be used to look at the entire history of events. First, locate the log entry for the failure event. Then, examine prior log entries to see what happened before the failure. It is a 3-step process:

  1. When looking at the log in Splunk, expand the log-entry in the column labeled 'i'.

    Screen capture of Splunk log display.
    Figure 1. Splunk log display
  2. Scroll down to find and open the Event Actions drop-down menu. Select "Show Source".

    Screen capture of event actions drop-down menu.
    Figure 2. Event Actions drop-down menu
  3. The Log source opens and displays the events that occurred before the error event.

    Screen capture of Splunk log entries around failure event.
    Figure 3. Log entries around failure event.

Q: How do I fix my pipeline when I get this error message while running the pipeline: java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.recordStats()Lcom/google/common/cache/CacheBuilder;?

A: Add relocation of com.google.common package to shade plugin configuration (which builds the fat JAR file). For example:

<project>
...
  <profiles>
    <profile>
      <id>platform</id>
      <build>
        <pluginManagement>
          <plugins>
            <plugin>
              <artifactId>maven-shade-plugin</artifactId>
              <executions>
                <execution>
                  <configuration>
                    <relocations combine.children="append">
                      <relocation>
                        <pattern>com.google.common</pattern>
                        <shadedPattern>${project.groupId}.shaded.com.google.common</shadedPattern>
                        <!-- WORKAROUND: until pipeline provided guava gets in-sync with environment pom -->
                      </relocation>
                    </relocations>
                  </configuration>
                </execution>
              </executions>
            </plugin>
          </plugins>
        </pluginManagement>
      </build>
    </profile>
  </profiles>
...
</project>

Q: How do I change the input catalogs used by a Pipeline Version?

A: There is no direct way to change the input catalogs associated with a Pipeline Version, but there is a way to achieve the same results. To do this, the Pipeline Version needs to be upgraded with a new Pipeline Version that uses the same template and configuration values, except for one or more specified input catalogs. For example, you can use the CLI with the following procedure.

  1. Create a new Pipeline Version using the same Pipeline Template and a new pipeline-config.conf file that specifies the new input catalogs.
  2. Use the pipeline upgrade procedure to replace the existing Pipeline Version with the new Pipeline Version that targets the new input catalogs.

Q: What does error code MSG1000 mean and can it be resolved?

A: There is timeout logic in place that waits for the Spark or Flink cluster to initialize for a new job. If the Spark or Flink cluster is not initialized within the expected timeframe (currently 1 hour), then the pipeline job is marked as failed, and its resources are deleted.

The common cause of this issue is a lack of resources within the platform to create the Spark or Flink cluster with the specified number of workers, CPU, and so on.

Remedy

Possible remedy for short-term issues:

  • If this Pipeline Version was configured with a substantial amount of resources, then try reducing the workers and/or total CPUs requested by about half to have a better chance of the cluster successfully starting.
  • Otherwise, the platform will automatically attempt to run the job again at the next 5 minute interval.
  • But, if this is a Batch Pipeline Version configured to run just once, then the platform will not automatically try to run again after a failure unless the Pipeline Version was explicitly re-Activated by the user.

If this continues to be an issue, please file a Support ticket.

Q: What does error code MSG2000 mean?

A: There is timeout logic in place that waits for the pipeline on Spark or Flink to start running. This timeout takes place after the Spark or Flink cluster has been successfully created.

If the job does not start running within the expected timeframe (currently 3 minutes), then the pipeline job is marked as failed, and its resources are deleted.

Please check the Splunk logs for more details (that is, using the loggingUrl from the Pipeline Version) or run the pipeline with logging set to debug logging level so more details are captured.

Remedy:

The message "MSG2000" literally means that the Spark job was never submitted to the pipeline's Spark cluster, resulting in a timeout. This can happen for different reasons. But one common cause is that the user hard-coded the Spark master property to local[*] such as shown here.

SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
context = new JavaSparkContext(conf);

This causes the code to override the master configuration set by the platform, and not utilize the Spark cluster resources. Pipeline Management is not able to monitor the status of the job and, after the timeout, will fail it.

This type of misconfiguration can have several different symptoms.

  • After the timeout period, the pipeline is reported as failed and an MSG2000 error is reported.
  • All logging in Splunk is shown under "source=driver" and no logs are shown under "source=executor".
  • When looking at the Splunk logs, the Spark job seems to be running because you will see log messages indicating that tasks are being executed (requires info or debug logging levels).
  • The pipeline may even produce data in the output catalog.
  • Because all execution is done in the driver, the JVM may throw an OutOfMemoryError.

Although it is common to set the master configuration to local[*] for local testing purposes, this should be disabled when deploying the code to the platform. One way of doing this shown below.

SparkConf conf = new SparkConf();
  if (!conf.contains("spark.master")) {
    LOGGER.warn("No master set, using local[*]");
    conf.setMaster("local[*]");
  }
  context = new JavaSparkContext(conf);

This makes Spark fall back to local[*] only if the master is not provided by spark-submit.

Q: What does error code MSG3000 mean?

A: This issue only applies to Spark jobs. It is based on a timeout (currently 5 minutes) that can occur when the Spark Context is closed but the Spark Driver hangs on exit.

When this occurs, the return code from the Spark Driver is not available (yet) because:

Note

Although the Job is marked as Failed in the platform, the Spark job may actually have completed successfully.

Remedy:

  • Verify exit logic. For example, check for infinite or long running loops after the Spark Context is closed.
  • Verify if any custom threads are sticking around. Make sure they are properly disposed of.
  • Remove explicit closure of the Spark Context from the Spark job code as it will automatically be closed on exit.

Q: What does error code MSG4000 mean?

A: This issue only applies to Flink jobs. In case of failures, a running Flink job switches first to failing then to failed state. If it has not switched from failing to failed within the expected timeframe (currently 20 minutes), then the pipeline job is marked as failed with this message, and its resources are deleted.

Remedy:

Please check the Splunk logs for more details (that is, using the loggingUrl from the Pipeline Version) or run the pipeline with logging set to debug logging level so more details are captured.

Q: I have a service hosted in AWS and accessible from the HERE network. What is the proper way to consume it from a pipeline running in the HERE platform?

A: The pipeline executor can access the external service using an HTTPS call from the pipeline. But, the external service cannot access any of the HERE platform's pipeline components. While it is possible for pipelines to access the Internet outside of the HERE platform, we don’t officially support this behavior. We will do further assessments and could decide to discontinue this behavior in future releases. So, please take caution if designing your pipelines for connection to external resources.

Q: I am experiencing repeated pipeline failures due to failure to get blocks or failure to connect to node. Smaller jobs have run with no problem. What can cause this?

A: This can happen on jobs with large data volumes due to a lost worker (node). An OutOfMemory exception is the typical cause. This may be the result of using a cluster that is too small or it might be a problem with memory allocated to the JVM. If there is no OutOfMemory message in the log file, use Grafana to check the JVM metrics to see if you are actually running out of memory when the worker disappears.

↑ Top


Troubleshooting Spark

Q: Why should I use the Spark UI?

A: The Spark framework includes a Web Console that is active for all Spark jobs in the Running state. It is called the Spark UI and can be accessed directly from within the platform. The Spark UI provides insight into batch pipeline processing, including jobs, stages, execution graphs, and logs from the executors. The Data Processing Library components also publish various statistics (see Spark AccumulatorV2), such as the number of metadata partitions read, the number of data bytes downloaded or uploaded, and so on. This data can be seen in the stages where the operations were performed.

For locally executed pipelines, the driver launches the UI web server as part of the driver process. While the driver is running, developers can access the web server from http://127.0.0.1:4040/jobs. The PipelineRunner has a handy --no-quit option that developers can use to make it wait for an ENTER key press before exiting after the final commit.

For batch pipelines running on the platform, you can access the Spark UI from the pipeline job details via CLI or the Web Portal. In the platform portal, a link Open Spark UI will appear when the job has started processing data. It will take you to the Spark UI of your running Job.

Starting with the batch-2.1.0 runtime environment, it is also possible to access the Spark UI after the pipeline job has completed its run. The runtime data for a completed job will be accessible via Sparkk UI for 30 days after completion. After this period, the job's runtime data is deleted and the Open Spark UI link will no longer be available in the Web Portal.

For additional information on troubleshooting with the Spark UI, see Spark UI.

Q: Why am I getting a Task Not Serializable Exception?

A: The "Task not serializable" exception is the most common in Spark development, especially when using complex class hierarchies. Whenever a function is executed as a Spark lambda function, all of the variables it refers to (its closure) are serialized to the workers. In most cases, the easiest fix is to declare the function in an object instead of a class or inline, and pass all the required state information as parameters to the function.

If a lambda function needs a non-serializable state, such as a cache, a common pattern is a lazy val in the object that is initialized in every worker when accessed the first time. The val should also be marked @transient to ensure it will not be serialized via references.

For performance reasons, the Data Processing Library heavily uses the Kryo serialization framework. This framework is used by Spark to serialize and deserialize objects present in RDDs. This includes widely used concepts such as partition keys and metadata, but also custom types used by developers identified with T in compiler patterns. In addition, in RDD-based patterns, developers are free to introduce any custom type and declare and use RDDs of such types.

The processing library can’t know all the custom types used in an application but the Kryo framework needs this information. Therefore developers need to provide a custom KryoRegistrator.

For example:


    package com.mycompany.myproject

    class MyKryoRegistrator extends com.here.platform.data.processing.spark.KryoRegistrator {

        override def userClasses: Seq[Class[_]] = Seq(
        classOf[MyClass1],
        classOf[MyClass2]
        )
    }

The name of the class must be provided to the library configuration via application.conf:


    spark {
        kryo.registrationRequired = true
        kryo.registrator = "com.mycompany.myproject.MyKryoRegistrator"
    }

↑ Top


Q: Why should I use the Flink Dashboard?

A: The Flink framework includes a Web Interface that is available for all Flink jobs in the Running state. It is called the Flink Dashboard and can be accessed directly from within the platform. The Flink Dashboard provides insight into the running stream pipeline, including overview of the Flink cluster and job details like metrics, checkpointing, backpressure etc. You can access the Flink Dashboard directly from the pipeline jobs list display in the platform portal.

For additional information on troubleshooting with the Flink Dashboard, see Flink Dashboard.

Q: How do I Handle Fatal Failures in Data Reading and Data Writing?

A: When a Pipeline Version writes to a streaming layer for an extended period of time (such as a week or more), rare TLS handshake failures of outgoing HTTPS connections can occur. This is triggered by FlinkWriteEngine::publish returning a SinkFunction used for publishing data inside a streaming Flink pipeline. When an exception is thrown during the invocation of the sink function, it should be automatically handled and logged. Otherwise, a Fatal Exception is encountered during sending data to the Flink Sink via the data-client. The Fatal Exception would result in the Pipeline Version failing, changing its state from RUNNING to FAILED.

Resolution:

This defect is addressed by enhancing the SinkFunction in the pipeline code to catch fatal exceptions and log them. The Pipeline Version will then continue to run and process messages. The following sample code demonstrates this new SinkFunction.

import com.here.cvs.ss.hrs.vss.logger.TraceLogger;
import com.here.platform.data.client.model.PendingPartition;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class VSSFlinkSinkFunction implements SinkFunction<PendingPartition> {

    private static final long serialVersionUID = 6118402368186172504L;

    private static final TraceLogger TRACE_LOGGER = new TraceLogger(VSSFlinkSinkFunction.class);

    private final SinkFunction<PendingPartition> writeEngineSinkFunction;

    public VSSFlinkSinkFunction(SinkFunction<PendingPartition> writeEngineSinkFunction) {
        this.writeEngineSinkFunction = writeEngineSinkFunction;
    }

    @Override
    public void invoke(PendingPartition pendingPartition) {
        try {
            TRACE_LOGGER.setTraceId(pendingPartition.getPartition());
            writeEngineSinkFunction.invoke(pendingPartition);
            TRACE_LOGGER.info("SEND_MESSAGE_SUCCESSFUL");
        } catch (Exception exception) {
            TRACE_LOGGER.error("SEND_MESSAGE_FAILED | cause=" + exception, exception);
        }
    }
}

Q: Can I use Accumulators or Counters in Flink?

A: Yes. For more information, see Accumulators & Counters

Q: Logs specified inside the executable Jar file of Flink Pipeline are not visible in Splunk.

A: This is a known issue in Flink. The workaround is to redirect the standard out and error back to System.out and System.err by adding the below lines as the first thing in the main method:

System.setOut(new PrintStream(new FileOutputStream(FileDescriptor.out)));
System.setErr(new PrintStream(new FileOutputStream(FileDescriptor.err)));

For more information, see the bug: FLINK-15504

Q: A pipeline fails with a message "JAR file does not exist", but the template is successfully created.

A: This error message can appear if a JAR file contains errors that can lead to unbounded memory usage. The message does not point at the root cause of the issue because this is an internal Flink error and cannot be overridden. Please test your JAR file on the local Flink instance.

Q: A stream pipeline fails because the TaskManager has run out of disk space.

A: Flink has a known bug (FLINK-15068) with default RocksDB logging set to INFO with no logging limit. So the RocksDB logs can potentially fill the disk space allocated to the TaskManager. This issue might manifest itself in the form of this exception:

"Caused by: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-"

The workaround is to set up RockDBStateBackend inside the Flink job. This requires providing the checkpoint URL and database storage path that RocksDB should use. These two values are passed to the pipeline runtime environment as Java system properties. The following code snippets provide an example to read these values from system properties and override the RocksDB options to minimize the logging.

  1. Add the below dependency to the pom.xml file. This dependency can be excluded from the fat JAR file, since it is available in the runtime environment of the pipeline.

     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
       <version>${flink.version}</version>
       <scope>provided<scope>
     </dependency>
    
  2. Create a CustomOptionsFactory class:

    public class CustomOptionsFactory implements OptionsFactory {
    
     @Override
     public DBOptions createDBOptions(DBOptions dbOptions) {
       // Refer to https://javadoc.io/static/org.rocksdb/rocksdbjni/5.7.5/org/rocksdb/Options.html
       // Minimal logging
       dbOptions.setInfoLogLevel(InfoLogLevel.HEADER_LEVEL);
       // to stop dumping rocksdb.stats to LOG
       dbOptions.setStatsDumpPeriodSec(0);
       return dbOptions;
     }
    
     @Override
     public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions columnFamilyOptions) {
       return columnFamilyOptions;
     }
    }
    
  3. Setup RocksDBStateBackend inside the main class:

    StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    ParameterTool systemParameters = ParameterTool.fromSystemProperties();
    // Use the Checkpoint URL provided by platform environment
    String checkpointUrl = systemParameters.get("checkpointUrl");
    // Use the disk space provided by platform environment for RocksDB local files
    String dbStoragePath = systemParameters.get("dbStoragePath");
    RocksDBStateBackend stateBackend = new RocksDBStateBackend(checkpointUrl, true);
    stateBackend.setDbStoragePath(dbStoragePath);
    
    stateBackend.setOptions(new CustomOptionsFactory());
    
    streamExecutionEnvironment.setStateBackend(stateBackend);
    

Q: Tasks are not evenly spread across all TaskManagers of a stream pipeline.

A: The default behavior for Flink (Stream-2.0.0 and Stream-3.0.0) is to utilize all the slots of a TaskManager before using another TaskManager. So, for stream pipelines with number of slots exceeding the parallelism of the Flink job, slots from some TaskManagers would be utilized completely, leaving other TaskManagers with slots to spare. Users can control this behavior by setting the Flink configuration cluster.evenly-spread-out-slots: true in the stream configuration. This property is set to false by default for stream pipelines. For more information, see the Flink ticket - FLINK-12122.

Note

This configuration is only available for the Stream-3.0.0 (Flink 1.10.1) runtime environments.

Q: How can I see the percentage CPU usage of jobmanager or taskmanagers of a Stream pipeline?

A: The below query provides the percentage CPU usage using the metrics reported by underlying infrastructure for the taskmanager containers.

sum(rate(container_cpu_usage_seconds_total{pod=~"job-$jobId-tm-.*", container="taskmanager"}[5m])) by (pod) 
/ sum(container_spec_cpu_quota{pod=~"job-$jobId-tm-.*", container="taskmanager"}
/ container_spec_cpu_period{pod=~"job-$jobId-tm-.*", container="taskmanager"}) by (pod)

Note

  • $jobId can be replaced by jobId value of the pipeline or $jobId variable can be set in the Grafana Dashboard.
  • The unit in Grafana for the Left Y axis should be percent(0.0-1.0).

Similarly for jobmanager, the query would be as follows.

sum(rate(container_cpu_usage_seconds_total{pod=~"job-$jobId-jm-.*", container="jobmanager"}[5m])) by (pod) 
/ sum(container_spec_cpu_quota{pod=~"job-$jobId-jm-.*", container="jobmanager"}
/ container_spec_cpu_period{pod=~"job-$jobId-jm-.*", container="jobmanager"}) by (pod)

The below screenshot from Grafana dashboard shows an example of taskmanager CPU usage

Screen capture of Grafana dashboard around taskmanager CPU usage.
Figure 4. Log entries around CPU usage.

Q: What does it mean when I get a "Savepoint took too long" message?

Screen capture of Portal display.
Figure 5. Savepoint Error Message

A: On a Pause or Upgrade operation for a Flink pipeline version, a savepoint is taken to allow the pipeline version to be restarted from the place it left off. On rare occasions, the process of taking the savepoint may fail due to a timeout. If this happens, an error message Savepoint took too long is displayed. The example listed in the figure above timed out in 2 minutes or 120,000 milliseconds.

Note

The Savepoint timeout for Flink pipelines has been increased to 10 minutes. This change improves the reliability of Stream pipelines by reducing the probability of this error.

If your Flink pipeline still experiences this issue, it is recommended to retry the operation. In case of continued savepoint failures, it is recommended to Cancel the pipeline and then Activate it again. Unfortunately, when a Savepoint is not successful, there is no saved state available for the pipeline version to use for resuming and it starts processing the data from scratch.

For more information on Flink savepoints, see Flink Savepointing.

↑ Top

results matching ""

    No results matching ""