%spark2_1.pyspark from pyspark.sql import SQLContext from pyspark.sql import HiveContext, Row from pyspark.sql.types import * import pandas as pd import pyspark.sql.functions as F trial_pps_order = spark.read.parquet('/tmp/exia/trial_pps_select') pps_order = spark.read.parquet('/tmp/exia/orders_pps_wc_member') member_info = spark.read.parquet('/tmp/exia/member_info') # newHiveContext=HiveContext(sc) query_T=""" select * from crm.masterdata_hummingbird_product_mst_banner_v1 where brand_name = 'pampers' """ product_mst=spark.sql(query_T) product_mst.show()
%spark2_1.pyspark: custom interpreter in Zeppelin 0.7.2
crm.masterdata_hummingbird_product_mst_banner_v1: hive table, data stored in hdfs encrypt zone.
The code throws exception below:
Traceback (most recent call last): File "/tmp/zeppelin_pyspark-7483288776781667654.py", line 367, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-7483288776781667654.py", line 360, in <module> exec(code, _zcUserQueryNameSpace) File "<stdin>", line 14, in <module> File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/pyspark/sql/dataframe.py", line 318, in show print(self._jdf.showString(n, 20)) File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o76.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6, pg-dmp-slave28.hadoop, executor 1): java.io.IOException: No KeyProvider is configured, cannot access an encrypted file at org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1338) at org.apache.hadoop.hdfs.DFSClient.createWrappedInputStream(DFSClient.java:1414) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:109) at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67) at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257) at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:256) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:100) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1455) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1443) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1442) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1625) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1614) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2390) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2792) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2389) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2396) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2131) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2822) at org.apache.spark.sql.Dataset.head(Dataset.scala:2131) at org.apache.spark.sql.Dataset.take(Dataset.scala:2346) at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: No KeyProvider is configured, cannot access an encrypted file at org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1338) at org.apache.hadoop.hdfs.DFSClient.createWrappedInputStream(DFSClient.java:1414) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:109) at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67) at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257) at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:256) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:100) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
So, Spark will use hive-site.xml to connect hiveserver2 in its conf directory. such as /usr/lib/spark-2.1.0-bin-hadoop2.6/conf, and the hive-site.xml will transmit to hive.
Solution:
add encrypt to hive-site.xml
<property> <name>hadoop.security.key.provider.path</name> <value>kms://http@dmp-master2.hadoop:16000/kms</value> </property> <property> <name>dfs.encrypt.data.transfer.algorithm</name> <value>3des</value> </property> <property> <name>dfs.encrypt.data.transfer.cipher.suites</name> <value>AES/CTR/NoPadding</value> </property> <property> <name>dfs.encrypt.data.transfer.cipher.key.bitlength</name> <value>256</value> </property> <property> <name>dfs.encryption.key.provider.uri</name> <value>kms://http@dmp-master2.hadoop:16000/kms</value> </property>