Page 1 of 3
1 2 3

Using py-SparkSQL2 in Zeppelin to query hdfs encryption data

from pyspark.sql import SQLContext
from pyspark.sql import HiveContext, Row
from pyspark.sql.types import *
import pandas as pd
import pyspark.sql.functions as F

trial_pps_order ='/tmp/exia/trial_pps_select')
pps_order ='/tmp/exia/orders_pps_wc_member')
member_info ='/tmp/exia/member_info')

# newHiveContext=HiveContext(sc)


select  * from crm.masterdata_hummingbird_product_mst_banner_v1 
where brand_name = 'pampers'


%spark2_1.pyspark: custom interpreter in Zeppelin 0.7.2
crm.masterdata_hummingbird_product_mst_banner_v1: hive table, data stored in hdfs encrypt zone.

The code throws exception below:

Traceback (most recent call last):
  File "/tmp/", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 14, in <module>
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/pyspark/sql/", line 318, in show
    print(self._jdf.showString(n, 20))
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/lib/", line 1257, in __call__
    answer, self.gateway_client, self.target_id,
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/pyspark/sql/", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/lib/", line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o76.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6, pg-dmp-slave28.hadoop, executor 1): No KeyProvider is configured, cannot access an encrypted file
	at org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(
	at org.apache.hadoop.hdfs.DFSClient.createWrappedInputStream(
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
	at org.apache.hadoop.mapred.LineRecordReader.<init>(
	at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(
	at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:256)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.executor.Executor$
	at java.util.concurrent.ThreadPoolExecutor.runWorker(
	at java.util.concurrent.ThreadPoolExecutor$
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1443)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1625)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1614)
	at org.apache.spark.util.EventLoop$$anon$
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2390)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2792)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2131)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2822)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2131)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2346)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(
	at java.lang.reflect.Method.invoke(
	at py4j.reflection.MethodInvoker.invoke(
	at py4j.reflection.ReflectionEngine.invoke(
	at py4j.Gateway.invoke(
	at py4j.commands.AbstractCommand.invokeMethod(
	at py4j.commands.CallCommand.execute(
Caused by: No KeyProvider is configured, cannot access an encrypted file
	at org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(
	at org.apache.hadoop.hdfs.DFSClient.createWrappedInputStream(
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
	at org.apache.hadoop.mapred.LineRecordReader.<init>(
	at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(
	at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:256)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.executor.Executor$
	at java.util.concurrent.ThreadPoolExecutor.runWorker(
	at java.util.concurrent.ThreadPoolExecutor$
	... 1 more

So, Spark will use hive-site.xml to connect hiveserver2 in its conf directory. such as /usr/lib/spark-2.1.0-bin-hadoop2.6/conf, and the hive-site.xml will transmit to hive.


add encrypt to hive-site.xml



Kerberos Master/Slave HA configuration

Since we only have one KDC on our cluster, it will be an SPOF (Single Point of Failure), so I have to create a Master/Slave KDC to avoid this problem.

There would be some steps to convert SP to HA.

master2.hadoop is existence KDC previously, master1.hadoop will install a new KDC server

  1. Install KDC on new node(master1.hadoop).
    yum -y install krb5-server
  2. Change config file on origin KDC(master2.hadoop)

    default_realm = PG.COM
    dns_lookup_kdc = false
    dns_lookup_realm = false
    ticket_lifetime = 7d
    renew_lifetime = 30d
    forwardable = true
    #default_tgs_enctypes = rc4-hmac
    #default_tkt_enctypes = rc4-hmac
    #permitted_enctypes = rc4-hmac
    udp_preference_limit = 1
    kdc_timeout = 3000
    PG.COM =
    kdc = master2.hadoop
    kdc = master1.hadoop
    admin_server = master2.hadoop
    default = FILE:/var/log/krb5kdc.log
    admin_server = FILE:/var/log/kadmind.log
    kdc = FILE:/var/log/krb5kdc.log

    Red block are very important on centos 6, orange block is the new line added

  3. On new node(master1.hadoop)
    scp master2.hadoop:/var/kerberos/krb5kdc/kdc.conf /var/kerberos/krb5kdc/
    scp master2.hadoop:/var/kerberos/krb5kdc/kadm5.acl /var/kerberos/krb5kdc/
    scp master2.hadoop:/var/kerberos/krb5kdc/.k5.PG.COM /var/kerberos/krb5kdc/
    scp master2.hadoop:/etc/krb5.conf /etc/
    : ank host/master1.hadoop
    : xst host/master1.hadoop
  4. On old node(master2.hadoop)
    : ank host/master2.hadoop
    : xst host/master2.hadoop
  5. And then back to new node(master1.hadoop)

    vi /var/kerberos/krb5kdc/kpropd.acl
    and insert two lines


    and then

    kdb_util stash
    kpropd -S
  6. Jump to old node(master2.hadoop)
    kdb_util dump /var/kerberos/krb5kdc/kdc.dump
    kprop -f /var/kerberos/krb5kdc/kdc.dump master1.hadoop

    When see “Database propagation to master1.hadoop: SUCCEEDED”, it means all the work have done well enough, and the slave should be start now.

  7. Last step on new node(master1.hadoop)
    service krb5kdc start

    The meaning of red block in step two is:
    Cenots 6.x with Kerberos 1.10.x had a bug that will cause sync kdb failed, the issue is there is a problem when you use rc4 as the default enctype. So you must comment the to avoid this happen. kprop doesn’t works with rc4 encrypt type.

    It fixed on kerberos 1.11.1

    finally: of course you should restart kdc and kadmin services

How to use cloudera parcels manually

Cloudera Parcel is actually a compressed file format, it just a tgz file with some meta info, so we can simply untar it with command tar zxf xxx.parcel. So we have the capability to  extract multi version of hadoop in a single machine. It’s easy to make hadoop upgrade or  downgrade, only ln -s CDH symbol link to a specific version directory.

With understanding that, I can package a self-distributed parcel package with my patches, and use cloudera-manager to manage the cluster… That sounds good

Integrate pyspark and sklearn with distributed parallel running on YARN

Python is useful for data scientists, especially with pyspark, but it’s a big problem to sysadmins, they will install python 2.7+ and spark and numpy,scipy,sklearn,pandas on each node, well, because Cloudera said that. Wow, imaging this, You have a cluster with 1000+ nodes or even 5000+ nodes, although you are good at DevOPS tools such as puppet, fabric, this work still cost lot of time. Continue reading Integrate pyspark and sklearn with distributed parallel running on YARN

Spark read LZO file error in Zeppelin

Due to our dear stingy Party A  said they will add not any nodes to the cluster, so we must compress the data to reduce disk consumption. Actually  I like LZ4, it’s natively supported by hadoop, and the compress/decompress speed is good enough,  compress ratio is better than LZO. But, I must choose LZO finally, no reason.

Well, since we use Cloudera Manager to  install Hadoop and Spark, so it’s no error when read lzo file in command line, simply use as text file, Ex:

val data = sc.textFile("/user/dmp/miaozhen/ott/MZN_OTT_20170101131042_0000_ott.lzo")

But in zeppelin, it will told me: native-lzo library not available, WTF?

Well, Zeppelin is a self-run environment, it will read its configuration only, do not read any other configs, Ex: it will not try to read /etc/spark/conf/spark-defaults.conf . So I must wrote all spark config such as you wrote them in spark-deafults.conf.

In our cluster, the Zeppelin conf looks like this:

Troubleshooting on Zeppelin with keberized cluster

We’ve updated Zeppelin from 0.7.0 to 0.7.1, still work with kerberized hadoop cluster, we use some interpreters in zeppelin, not all. And I wanna write some troubleshooting records with this awesome webtool. BTW: I can write a webtool better than this 1000 times, such as phpHiveAdmin, basically I can see the map/reduce prograss bar Continue reading Troubleshooting on Zeppelin with keberized cluster

Use kerberized Hive in Zeppelin

We deployed Apache Zeppelin 0.7.0 for the Kerberos secured Hadoop cluster, and my dear colleague cannot use it correctly, so I have to find out why he can’t use anything in Zeppelin, except shell command.

I start with Kerberized Hive Continue reading Use kerberized Hive in Zeppelin

Troubleshooting kerberized hive issues

Today, my colleagues want to use hive in zeppelin, it’s the first time to use hive in this new kerberized cluster, and unfortunately there was an authenticate issue of using hive. So I have to debug on it.

The hive client was installed hadoop-client and hive and put all the needed keytabs in config dirs and set the right permission of their all, but still could not connect to the cluster. The log always shows authentication failed. Continue reading Troubleshooting kerberized hive issues

Enable Kerberos secured Hadoop cluster with Cloudera Manager

I created a secured Hadoop cluster for P&G with cloudera manager, and this document is to record how to enable kerberos secured cluster with cloudera manager. Firstly we should have a cluster that contains kerberos KDC and kerberos clients Continue reading Enable Kerberos secured Hadoop cluster with Cloudera Manager

Dr.Elephant mysql connection error

This is the first time I try to use english to write my blog, so don’t jeer at the mistake of my grammar and spelling.

Because of multi threaded drelephant will cause JobHistoryServer’s Loads very high, so I stopped it for a strench of time. Until last week, a period pull from JHS patch merge request from github was released. I re-compiled dr. elephant and deploy the new dr. elephant on the cluster. It seems stable, but on this Monday morning, my leader told me that there were no more counters and any information about cluster jobs in dr. elephant.  So I logged in to the server, and check log, then I found this message below. Continue reading Dr.Elephant mysql connection error

Page 1 of 3
1 2 3