Using py-SparkSQL2 in Zeppelin to query hdfs encryption data

%spark2_1.pyspark
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext, Row
from pyspark.sql.types import *
import pandas as pd
import pyspark.sql.functions as F

trial_pps_order = spark.read.parquet('/tmp/exia/trial_pps_select')
pps_order = spark.read.parquet('/tmp/exia/orders_pps_wc_member')
member_info = spark.read.parquet('/tmp/exia/member_info')


# newHiveContext=HiveContext(sc)

query_T="""  

select  * from crm.masterdata_hummingbird_product_mst_banner_v1 
where brand_name = 'pampers'

"""
product_mst=spark.sql(query_T)

product_mst.show()

%spark2_1.pyspark: custom interpreter in Zeppelin 0.7.2
crm.masterdata_hummingbird_product_mst_banner_v1: hive table, data stored in hdfs encrypt zone.

The code throws exception below:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-7483288776781667654.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-7483288776781667654.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 14, in <module>
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/pyspark/sql/dataframe.py", line 318, in show
    print(self._jdf.showString(n, 20))
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o76.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6, pg-dmp-slave28.hadoop, executor 1): java.io.IOException: No KeyProvider is configured, cannot access an encrypted file
	at org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1338)
	at org.apache.hadoop.hdfs.DFSClient.createWrappedInputStream(DFSClient.java:1414)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
	at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:109)
	at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:256)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:100)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1455)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1443)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1625)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1614)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2390)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2792)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2389)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2396)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2131)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2822)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2131)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2346)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No KeyProvider is configured, cannot access an encrypted file
	at org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1338)
	at org.apache.hadoop.hdfs.DFSClient.createWrappedInputStream(DFSClient.java:1414)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
	at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:109)
	at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:256)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:100)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

So, Spark will use hive-site.xml to connect hiveserver2 in its conf directory. such as /usr/lib/spark-2.1.0-bin-hadoop2.6/conf, and the hive-site.xml will transmit to hive.

Solution:

add encrypt to hive-site.xml

  <property>
    <name>hadoop.security.key.provider.path</name>
    <value>kms://http@dmp-master2.hadoop:16000/kms</value>
  </property>
  <property>
    <name>dfs.encrypt.data.transfer.algorithm</name>
    <value>3des</value>
  </property>
  <property>
    <name>dfs.encrypt.data.transfer.cipher.suites</name>
    <value>AES/CTR/NoPadding</value>
  </property>
  <property>
    <name>dfs.encrypt.data.transfer.cipher.key.bitlength</name>
    <value>256</value>
  </property>
  <property>
    <name>dfs.encryption.key.provider.uri</name>
    <value>kms://http@dmp-master2.hadoop:16000/kms</value>
  </property>

 

Enable HTTPS access in Zeppelin

I was using certified key file to enable HTTPS, if you use self-signatured key, see second part

First part:
I had got two files which one is  the private key named server.key and another one is certification file named server.crt
Use the following command to create a jks keystore file

openssl pkcs12 -export -in xxx.com.crt -inkey xxx.com.key -out xxx.com.pkcs12
keytool -importkeystore -srckeystore xxx.com.pkcs12 -destkeystore xxx.com.jks -srcstoretype pkcs12

Second part:
Use self-signatured key

# Generate root key file and cert file, key file could be named key or pem, it's same.
openssl genrsa -out root.key(pem) 2048 # Generate root key file
openssl req -x509 -new -key root.key(pem) -out root.crt # Generate root cert file

# Generate client key and cert and csr file
openssl genrsa -out client.key(pem) 2048 # Generate client key file
openssl req -new -key client.key(pem) -out client.csr # Generate client cert request file
openssl x509 -req -in client.csr -CA root.crt -CAkey root.key(pem) -CAcreateserial -days 3650 -out client.crt # Use root cert to generate client cert file

# Generate server key and cert and csr file
openssl genrsa -out server.key(pem) 2048 # Generate server key file, use in Zeppelin
openssl req -new -key server.key(pem) out server.csr @ Generate server cert request file
openssl x509 -req -in server.csr -CA root.crt -CAkey root.key(pem) -CAcreateserial -days 3650 -out server.crt # Use root cert to generate server cert file

# Generate client jks file
openssl pkcs12 -export -in client.crt -inkey client.key(pem) -out client.pkcs12 # Package to pkcs12 format, must input a password, you should remember the password
keytool -importkeystore -srckeystore client.pkcs12 -destkeystore client.jks -srcstoretype pkcs12 # The client password you just input at last step

# Generate server jks file
openssl pkcs12 -export -in server.crt -inkey server.key(pem) -out server.pkcs12 # Package to pkcs12 format, must input a password, you should remember the password
keytool -importkeystore -srckeystore server.pkcs12 -destkeystore server.jks -srcstoretype pkcs12 # The server password you just input at last step

The server key, cert and jks are using to configure zeppelin, the client key, cert and jks are using to install into browser or your client access codes.
Then, make a directory to put the server things in it, such as

mkdir -p /etc/zeppelin/conf/ssl
cp server.crt server.jks /etc/zeppelin/conf/ssl

And then modify zeppelin-site.xml to enable https access

<property>
  <name>zeppelin.server.ssl.port</name>
  <value>8443</value>
  <description>Server ssl port. (used when ssl property is set to true)</description>
</property>
<property>
  <name>zeppelin.ssl</name>
  <value>true</value>
  <description>Should SSL be used by the servers?</description>
</property>
<property>
  <name>zeppelin.ssl.client.auth</name>
  <value>false</value>
  <description>Should client authentication be used for SSL connections?</description>
</property>
<property>
  <name>zeppelin.ssl.keystore.path</name>
  <value>/etc/zeppelin/conf/ssl/xxx.com.jks</value>
  <description>Path to keystore relative to Zeppelin configuration directory</description>
</property>
<property>
  <name>zeppelin.ssl.keystore.type</name>
  <value>JKS</value>
  <description>The format of the given keystore (e.g. JKS or PKCS12)</description>
</property>
<property>
  <name>zeppelin.ssl.keystore.password</name>
  <value>password which you input on generating server jks step</value>
  <description>Keystore password. Can be obfuscated by the Jetty Password tool</description>
</property>

Then, all completed, and you can redirect 443 to 8443 by using iptables or other reverse proxy tools

Spark read LZO file error in Zeppelin

Due to our dear stingy Party A  said they will add not any nodes to the cluster, so we must compress the data to reduce disk consumption. Actually  I like LZ4, it’s natively supported by hadoop, and the compress/decompress speed is good enough,  compress ratio is better than LZO. But, I must choose LZO finally, no reason.

Well, since we use Cloudera Manager to  install Hadoop and Spark, so it’s no error when read lzo file in command line, simply use as text file, Ex:

val data = sc.textFile("/user/dmp/miaozhen/ott/MZN_OTT_20170101131042_0000_ott.lzo")
data.take(3)

But in zeppelin, it will told me: native-lzo library not available, WTF?

Well, Zeppelin is a self-run environment, it will read its configuration only, do not read any other configs, Ex: it will not try to read /etc/spark/conf/spark-defaults.conf . So I must wrote all spark config such as you wrote them in spark-deafults.conf.

In our cluster, the Zeppelin conf looks like this:

Troubleshooting on Zeppelin with keberized cluster

We’ve updated Zeppelin from 0.7.0 to 0.7.1, still work with kerberized hadoop cluster, we use some interpreters in zeppelin, not all. And I wanna write some troubleshooting records with this awesome webtool. BTW: I can write a webtool better than this 1000 times, such as phpHiveAdmin, basically I can see the map/reduce prograss bar Continue reading Troubleshooting on Zeppelin with keberized cluster

Use kerberized Hive in Zeppelin

We deployed Apache Zeppelin 0.7.0 for the Kerberos secured Hadoop cluster, and my dear colleague cannot use it correctly, so I have to find out why he can’t use anything in Zeppelin, except shell command.

I start with Kerberized Hive Continue reading Use kerberized Hive in Zeppelin