Page 1 of 4
1 2 3 4

Jupyterhub integrate with customized Kerberos authenticator

Since I didn’t spend much time on hadoop and spark maintaince, our first party would like to use jupyterhub/lab on hadoop, and their cluster was enhanced security with kerberos and hdfs encryption, so I have to modify jupyterhub/lab to adapt the cluster job submission.

The first party firstly wants to use the SSO login, and somedays they want to use linux pam users to login, so I wrote a custom authenticator to use linux pam login and use the keytab with linux username to authenticate from kerberos in jupyterhub. So the customers could submit their hive or spark jobs without kinit command.

Firstly, the keytab of each user has a fixed filename convention, such as , a principal on dmp-python1 host,  would like xianglei/dmp-python1@GC.COM, and the keytab file should be xianglei.py1.keytab.

And then, the login handler in jupyterhub is jupyterhub/handlers/login.py

change the login.py to login and request a username from customer’s SSO system.

 

class LoginHandler(BaseHandler):
    """Render the login page."""

    # commented for matrix ai
    args = dict()
    args['contenttype'] = 'application/json'
    args['app'] = 'dmp-jupyterhub'
    args['subkey'] = 'xxxxxxxxxxxxxxxxxxx'

    def _render(self, login_error=None, username=None):
        return self.render_template(
            'login.html',
            next=url_escape(self.get_argument('next', default='')),
            username=username,
            login_error=login_error,
            custom_html=self.authenticator.custom_html,
            login_url=self.settings['login_url'],
            authenticator_login_url=url_concat(
                self.authenticator.login_url(self.hub.base_url),
                {'next': self.get_argument('next', '')},
            ),
        )

    async def get(self):
        """
        modify to fit pg's customized oauth2 system, if this method publish with matrix ai, then comment all,
        and write code that only get username from matrixai login form.
        """
        self.statsd.incr('login.request')
        user = self.current_user

        if user:
            # set new login cookie
            # because single-user cookie may have been cleared or incorrect
            # if user exists, set a cookie and jump to next url
            self.set_login_cookie(user)
            self.redirect(self.get_next_url(user), permanent=False)
        else:
            # if user doesnt exists, jump to login page
            '''
            # below is original jupyterhub login, commented
            if self.authenticator.auto_login:
                auto_login_url = self.authenticator.login_url(self.hub.base_url)
                if auto_login_url == self.settings['login_url']:
                    # auto_login without a custom login handler
                    # means that auth info is already in the request
                    # (e.g. REMOTE_USER header)
                    user = await self.login_user()
                    if user is None:
                        # auto_login failed, just 403
                        raise web.HTTPError(403)
                    else:
                        self.redirect(self.get_next_url(user))
                else:
                    if self.get_argument('next', default=False):
                        auto_login_url = url_concat(
                            auto_login_url, {'next': self.get_next_url()}
                        )
                    self.redirect(auto_login_url)
                return
            username = self.get_argument('username', default='')
            self.finish(self._render(username=username))
            '''
            # below is cusstomized login get
            import json
            import requests
            access_token = self.get_cookie('access_token') # this is the oauth2 access_token
            self.log.info("access_token: " + access_token)
            token_type = self.get_argument('token_type', 'Bearer')
            if access_token != '':
                # use token to request sso address, to get ShortName ShortName is used for kerberos
                userinfo_url = 'https://xxxx.com/paas-ssofed/v3/token/userinfo'
                headers = {
                    'Content-Type': 'application/x-www-form-urlencoded',
                    'Ocp-Apim-Subscription-Key': self.args['subkey'],
                    'Auth-Type': 'ssofed',
                    'Authorization': token_type + ' ' + access_token
                }
                body = {
                    'token': access_token
                }
                resp = json.loads(requests.post(userinfo_url, headers=headers, data=body).text)
                user = resp['ShortName']
                data = dict()
                data['username'] = user
                data['password'] = ''
                # put ShortName into python dict, and call login_user method of jupyterhub
                user = await self.login_user(data)
                self.set_cookie('username', resp['ShortName'])
                self.set_login_cookie(user)
                self.redirect(self.get_next_url(user))
            else:
                self.redirect('http://xxxx.cn:3000/')

Do not change the post method of LoginHandler

And then, wrote a custom authenticator class, I named it as GCAuthenticator, it do nothing, just return the username from LoginHandler.And put the file into jupyterhub/garbage_customer/gcauthenticator.py

#!/usr/bin/env python

from tornado import gen
from jupyterhub.auth import Authenticator

class GCAuthenticator(Authenticator):
    """
    usage:
    1. generate hub config file with command
        jupyterhub --generate-config /path/to/store/jupyterhub_config.py

    2. edit config file, comment
    # c.JupyterHub.authenticator_class = 'jupyterhub.auth.PAMAuthenticator'
    and write a new line
    c.JupyterHub.authenticator_class = 'jupyterhub.garbage_customer.gcauthenticator.GCAuthenticator
    """

    @gen.coroutine
    # 入口参数固定写法, 啥也不做, 直接返回用户名, 即为真.这里传递的data, 就是之前在login里面定义的data['username']和data['password']
    # 但是由于验证已经由甲方的sso做了, 所以我们用不到password, 但是格式还是要遵守的.
    # 其实按照这个思路, 自己把这个方法改写成mysql, postgres, 或者文本文件做用户验证, 其实也很简单.
    def authenticate(self, handler, data):
        user = data['username']
        return user

 

And then, the local kerberos authentication step.

When login successed,  jupyterhub will call spawner method in spawner.py to fork a sub process, the spawner method will call singleuserapp method to create a sub process of notebook and in system process, it will named as singleuser. But actually, the subprocess is created by spawner, so I should extend the spawner method, then I can use the kerberos authenticate and insert the environment variables, this step didn’t need to change spawner.py, only write a new file as a plugin. like gckrbspawner.py

 

# Save this file in your site-packages directory as krbspawner.py
#
# then in /etc/jupyterhub/config.py, set:
#
#    c.JupyterHub.spawner_class = 'garbage_customer.gckrbspawner.KerberosSpawner'


from jupyterhub.spawner import LocalProcessSpawner
from jupyterhub.utils import random_port
from subprocess import Popen,PIPE
from tornado import gen
import pipes

REALM = 'GC.COM'

# KerberosSpawner扩展自spawner.py的localProcessSpawner类
class KerberosSpawner(LocalProcessSpawner):
    @gen.coroutine
    def start(self):
        """启动子进程的方法"""
        if self.ip:
            self.user.server.ip = self.ip # 用户服务的ip为jupyterhub启动设置的ip
        else:
            self.user.server.ip = '127.0.0.1' # 或者是 127.0.0.1
        self.user.server.port = random_port() # singleruser server, 也就是notebook子进程, 启动时使用随机端口
        self.log.info('Spawner ip: %s' % self.user.server.ip)
        self.log.info('Spawner port: %s' % self.user.server.port)
        cmd = []
        env = self.get_env() # 获取jupyterhub的环境变量
        # self.log.info(env)
        
        """ Get user uid and gid from linux"""
        uid_args = ['id', '-u', self.user.name] # 获取当前登录的用户名对应的linux uid
        uid = Popen(uid_args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
        uid = uid.communicate()[0].decode().strip()
        gid_args = ['id', '-g', self.user.name] # 获取当前登录用户对应的 linux gid
        gid = Popen(gid_args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
        gid = gid.communicate()[0].decode().strip()
        self.log.info('UID: ' + uid + ' GID: ' + gid)
        self.log.info('Authenticating: ' + self.user.name)

        cmd.extend(self.cmd)
        cmd.extend(self.get_args())

        self.log.info("Spawning %s", ' '.join(pipes.quote(s) for s in cmd))
        # 使用linux用户认证kerberos用户, 由于 jupyterhub默认使用 /home/username作为每个用户的文件夹, 所以我把用户认证需要的keytab放到每个/home/username下面
        # 例如 xianglei.wb1.keytab ,对应的linux用户就是xianglei, 对应的krb用户就是 xianglei/gc-dmp-workbench1@GC.COM.
        kinit = ['/usr/bin/kinit', '-kt',
                 '/home/%s/%s.wb1.keytab' % (self.user.name, self.user.name,),
                 '-c', '/tmp/krb5cc_%s' % (uid,),
                 '%s/gc-dmp-workbench1@%s' % (self.user.name, REALM)]
        self.log.info("KRB5 initializing with command %s", ' '.join(kinit))
        # 使用subprocess的Popen在spawner里面创建子进程去做krb认证
        Popen(kinit, preexec_fn=self.make_preexec_fn(self.user.name)).wait()

        popen_kwargs = dict(
            preexec_fn=self.make_preexec_fn(self.user.name),
            start_new_session=True,  # 不转发 signals
        )
        popen_kwargs.update(self.popen_kwargs)
        popen_kwargs['env'] = env
        self.proc = Popen(cmd, **popen_kwargs)
        self.pid = self.proc.pid
        # 返回ip和端口号, 交还给jupyterhub server进行子进程服务注册
        return (self.user.server.ip, self.user.server.port)

And add a config into jupyterhub generated config file

c.JupyterHub.spawner_class = 'garbage_customer.gckrbspawner.KerberosSpawner'

In next topic, I will write how to auto refresh kerberos authentication.

DS means Data Scientist? NO!

These “Data Scientist” in our dear first party, can hold all my jokes of this year. (Update at any time)

  1. DS: Why my spark job takes so much time?
    Me: ?
    DS:
    Me: No, It’s a resident YARN container, not your spark job!
  2. DS: Why my spark job report error?
    Me: Give me your code and the screenshot.
    DS: 
    Me: No, You should not stop spark context before you run spark, I mean You shoud not ask someone to answer your question after you murdered him!
  3. DS: Why I can’t login SSO system?
    (SSO built on their system, using OAuth2, Username and Password authenticates were all on their servers, I only took the authorization code and userinfo after they logged in).
    Me: It’s not my business, please contact your server admin if you ensure about username and password are all correctly.
    DS: I don’t care, you should solve this problem.
    Me: Sorry ma’m, I can’t fix your company’s servers.
    DS: I don’t care, you must fix it.
    Me: Alright, please give me the root password of your SSO server.
    DS: I don’t know about what are you talking about, you should fix it.
    Another DS: Hey, It’s not their problem, you should contact our infosec team.
    DS: Ohch
  4. Me: You‘ve written such beautiful PigLatinic Python.
    DS: Thank you, I thought so.DS means “Data Scienist”? No I call them “Definitively Stupid”

jupyterlab and pyspark2 integration in 1 minute

As we use CDH 5.14.0 on our hadoop cluster, the highest spark version to be support is 2.1.3, so this blog is to record the procedure of how I install pyspark-2.1.3 and integrate it with jupyter-lab.

Evironment:
spark 2.1.3
CDH 5.14.0 – hive 1.1.0
Anaconda3 – python 3.6.8

  1. Add export to spark-env.sh
    export PYSPARK_PYTHON=/opt/anaconda3/bin/python
    export PYSPARK_DRIVER_PYTHON=/opt/anaconda3/bin/jupyter-lab
    export PYSPARK_DRIVER_PYTHON_OPTS='  --ip=172.16.191.30 --port=8890'
  2. install sparkmagic
    pip install sparkmagic
  3. Use conda or pip command to downgrade ipykernel to 4.9.0, cause ipykernel 5.x doesn’t support sparkmagic, it will throw a Future exception.
    https://github.com/jupyter-incubator/sparkmagic/issues/492
  4. /opt/spark-2.1.3/bin/pyspark –master yarn

If you need to run with backgrand , use nohup.

if nessasery, add a kernel json at /usr/share/jupyter/kernels/pyspark2 or /usr/local/share/jupyter/kernels/pyspark2, with the content as
{
"argv": [
"python3.6",
"-m",
"ipykernel_launcher",
"-f",
"{connection_file}"
],
"display_name": "Python3.6+PySpark2.1",
"language": "python",
"env": {
"PYSPARK_PYTHON": "/opt/anaconda3/bin/python",
"SPARK_HOME": "/opt/spark-2.1.3-bin-hadoop2.6",
"HADOOP_CONF_DIR": "/etc/hadoop/conf",
"HADOOP_CLIENT_OPTS": "-Xmx2147483648 -XX:MaxPermSize=512M -Djava.net.preferIPv4Stack=true",
"PYTHONPATH": "/opt/spark-2.1.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip:/opt/spark-2.1.3-bin-hadoop2.6/python/",
"PYTHONSTARTUP": "/opt/spark-2.1.3-bin-hadoop2.6/python/pyspark/shell.py",
"PYSPARK_SUBMIT_ARGS": " --jars /opt/spark-2.1.3-bin-hadoop2.6/jars/greenplum-spark_2.11-1.6.2.jar --master yarn --deploy-mode client --name JuPysparkHub pyspark-shell",
"JAVA_HOME": "/opt/jdk1.8.0_141"
}
}

Another problem, in pyspark, sqlContext cannot access remote hivemetastore and without any exceptions, when i run show databases in pyspark, it always return me default. And then i found out, in spark2’s jars dir, there was a hive-exec-1.1.0-cdh5.14.0.jar, delete this jar file, everythings ok.

Using py-SparkSQL2 in Zeppelin to query hdfs encryption data

%spark2_1.pyspark
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext, Row
from pyspark.sql.types import *
import pandas as pd
import pyspark.sql.functions as F

trial_pps_order = spark.read.parquet('/tmp/exia/trial_pps_select')
pps_order = spark.read.parquet('/tmp/exia/orders_pps_wc_member')
member_info = spark.read.parquet('/tmp/exia/member_info')


# newHiveContext=HiveContext(sc)

query_T="""  

select  * from crm.masterdata_hummingbird_product_mst_banner_v1 
where brand_name = 'pampers'

"""
product_mst=spark.sql(query_T)

product_mst.show()

%spark2_1.pyspark: custom interpreter in Zeppelin 0.7.2
crm.masterdata_hummingbird_product_mst_banner_v1: hive table, data stored in hdfs encrypt zone.

The code throws exception below:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-7483288776781667654.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-7483288776781667654.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 14, in <module>
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/pyspark/sql/dataframe.py", line 318, in show
    print(self._jdf.showString(n, 20))
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark-2.1.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o76.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6, pg-dmp-slave28.hadoop, executor 1): java.io.IOException: No KeyProvider is configured, cannot access an encrypted file
	at org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1338)
	at org.apache.hadoop.hdfs.DFSClient.createWrappedInputStream(DFSClient.java:1414)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
	at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:109)
	at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:256)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:100)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1455)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1443)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1625)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1614)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2390)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2792)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2389)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2396)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2132)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2131)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2822)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2131)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2346)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No KeyProvider is configured, cannot access an encrypted file
	at org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1338)
	at org.apache.hadoop.hdfs.DFSClient.createWrappedInputStream(DFSClient.java:1414)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
	at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
	at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:109)
	at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:256)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:100)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

So, Spark will use hive-site.xml to connect hiveserver2 in its conf directory. such as /usr/lib/spark-2.1.0-bin-hadoop2.6/conf, and the hive-site.xml will transmit to hive.

Solution:

add encrypt to hive-site.xml

  <property>
    <name>hadoop.security.key.provider.path</name>
    <value>kms://http@dmp-master2.hadoop:16000/kms</value>
  </property>
  <property>
    <name>dfs.encrypt.data.transfer.algorithm</name>
    <value>3des</value>
  </property>
  <property>
    <name>dfs.encrypt.data.transfer.cipher.suites</name>
    <value>AES/CTR/NoPadding</value>
  </property>
  <property>
    <name>dfs.encrypt.data.transfer.cipher.key.bitlength</name>
    <value>256</value>
  </property>
  <property>
    <name>dfs.encryption.key.provider.uri</name>
    <value>kms://http@dmp-master2.hadoop:16000/kms</value>
  </property>

 

Kerberos Master/Slave HA configuration

Since we only have one KDC on our cluster, it will be an SPOF (Single Point of Failure), so I have to create a Master/Slave KDC to avoid this problem.

There would be some steps to convert SP to HA.

Description
master2.hadoop is existence KDC previously, master1.hadoop will install a new KDC server

  1. Install KDC on new node(master1.hadoop).
    yum -y install krb5-server
  2. Change config file on origin KDC(master2.hadoop)

    [libdefaults]
    default_realm = PG.COM
    dns_lookup_kdc = false
    dns_lookup_realm = false
    ticket_lifetime = 7d
    renew_lifetime = 30d
    forwardable = true
    #default_tgs_enctypes = rc4-hmac
    #default_tkt_enctypes = rc4-hmac
    #permitted_enctypes = rc4-hmac
    udp_preference_limit = 1
    kdc_timeout = 3000
    [realms]
    PG.COM =
    {
    kdc = master2.hadoop
    kdc = master1.hadoop
    admin_server = master2.hadoop
    }
    [logging]
    default = FILE:/var/log/krb5kdc.log
    admin_server = FILE:/var/log/kadmind.log
    kdc = FILE:/var/log/krb5kdc.log

    Red block are very important on centos 6, orange block is the new line added

  3. On new node(master1.hadoop)
    scp master2.hadoop:/var/kerberos/krb5kdc/kdc.conf /var/kerberos/krb5kdc/
    scp master2.hadoop:/var/kerberos/krb5kdc/kadm5.acl /var/kerberos/krb5kdc/
    scp master2.hadoop:/var/kerberos/krb5kdc/.k5.PG.COM /var/kerberos/krb5kdc/
    scp master2.hadoop:/etc/krb5.conf /etc/
    kadmin
    : ank host/master1.hadoop
    : xst host/master1.hadoop
  4. On old node(master2.hadoop)
    kadmin
    : ank host/master2.hadoop
    : xst host/master2.hadoop
  5. And then back to new node(master1.hadoop)

    vi /var/kerberos/krb5kdc/kpropd.acl
    and insert two lines

    host/master1.hadoop@PG.COM
    host/master2.hadoop@PG.COM

    and then

    kdb_util stash
    kpropd -S
  6. Jump to old node(master2.hadoop)
    kdb_util dump /var/kerberos/krb5kdc/kdc.dump
    kprop -f /var/kerberos/krb5kdc/kdc.dump master1.hadoop

    When see “Database propagation to master1.hadoop: SUCCEEDED”, it means all the work have done well enough, and the slave should be start now.

  7. Last step on new node(master1.hadoop)
    service krb5kdc start

    The meaning of red block in step two is:
    Cenots 6.x with Kerberos 1.10.x had a bug that will cause sync kdb failed, the issue is there is a problem when you use rc4 as the default enctype. So you must comment the to avoid this happen. kprop doesn’t works with rc4 encrypt type.

    https://github.com/krb5/krb5/commit/8d01455ec9ed88bd3ccae939961a6e123bb3d45f

    It fixed on kerberos 1.11.1

    finally: of course you should restart kdc and kadmin services

Enable HTTPS access in Zeppelin

I was using certified key file to enable HTTPS, if you use self-signatured key, see second part

First part:
I had got two files which one is  the private key named server.key and another one is certification file named server.crt
Use the following command to create a jks keystore file

openssl pkcs12 -export -in xxx.com.crt -inkey xxx.com.key -out xxx.com.pkcs12
keytool -importkeystore -srckeystore xxx.com.pkcs12 -destkeystore xxx.com.jks -srcstoretype pkcs12

Second part:
Use self-signatured key

# Generate root key file and cert file, key file could be named key or pem, it's same.
openssl genrsa -out root.key(pem) 2048 # Generate root key file
openssl req -x509 -new -key root.key(pem) -out root.crt # Generate root cert file

# Generate client key and cert and csr file
openssl genrsa -out client.key(pem) 2048 # Generate client key file
openssl req -new -key client.key(pem) -out client.csr # Generate client cert request file
openssl x509 -req -in client.csr -CA root.crt -CAkey root.key(pem) -CAcreateserial -days 3650 -out client.crt # Use root cert to generate client cert file

# Generate server key and cert and csr file
openssl genrsa -out server.key(pem) 2048 # Generate server key file, use in Zeppelin
openssl req -new -key server.key(pem) out server.csr @ Generate server cert request file
openssl x509 -req -in server.csr -CA root.crt -CAkey root.key(pem) -CAcreateserial -days 3650 -out server.crt # Use root cert to generate server cert file

# Generate client jks file
openssl pkcs12 -export -in client.crt -inkey client.key(pem) -out client.pkcs12 # Package to pkcs12 format, must input a password, you should remember the password
keytool -importkeystore -srckeystore client.pkcs12 -destkeystore client.jks -srcstoretype pkcs12 # The client password you just input at last step

# Generate server jks file
openssl pkcs12 -export -in server.crt -inkey server.key(pem) -out server.pkcs12 # Package to pkcs12 format, must input a password, you should remember the password
keytool -importkeystore -srckeystore server.pkcs12 -destkeystore server.jks -srcstoretype pkcs12 # The server password you just input at last step

The server key, cert and jks are using to configure zeppelin, the client key, cert and jks are using to install into browser or your client access codes.
Then, make a directory to put the server things in it, such as

mkdir -p /etc/zeppelin/conf/ssl
cp server.crt server.jks /etc/zeppelin/conf/ssl

And then modify zeppelin-site.xml to enable https access

<property>
  <name>zeppelin.server.ssl.port</name>
  <value>8443</value>
  <description>Server ssl port. (used when ssl property is set to true)</description>
</property>
<property>
  <name>zeppelin.ssl</name>
  <value>true</value>
  <description>Should SSL be used by the servers?</description>
</property>
<property>
  <name>zeppelin.ssl.client.auth</name>
  <value>false</value>
  <description>Should client authentication be used for SSL connections?</description>
</property>
<property>
  <name>zeppelin.ssl.keystore.path</name>
  <value>/etc/zeppelin/conf/ssl/xxx.com.jks</value>
  <description>Path to keystore relative to Zeppelin configuration directory</description>
</property>
<property>
  <name>zeppelin.ssl.keystore.type</name>
  <value>JKS</value>
  <description>The format of the given keystore (e.g. JKS or PKCS12)</description>
</property>
<property>
  <name>zeppelin.ssl.keystore.password</name>
  <value>password which you input on generating server jks step</value>
  <description>Keystore password. Can be obfuscated by the Jetty Password tool</description>
</property>

Then, all completed, and you can redirect 443 to 8443 by using iptables or other reverse proxy tools

How to use cloudera parcels manually

Cloudera Parcel is actually a compressed file format, it just a tgz file with some meta info, so we can simply untar it with command tar zxf xxx.parcel. So we have the capability to  extract multi version of hadoop in a single machine. It’s easy to make hadoop upgrade or  downgrade, only ln -s CDH symbol link to a specific version directory.

With understanding that, I can package a self-distributed parcel package with my patches, and use cloudera-manager to manage the cluster… That sounds good

Integrate pyspark and sklearn with distributed parallel running on YARN

Python is useful for data scientists, especially with pyspark, but it’s a big problem to sysadmins, they will install python 2.7+ and spark and numpy,scipy,sklearn,pandas on each node, well, because Cloudera said that. Wow, imaging this, You have a cluster with 1000+ nodes or even 5000+ nodes, although you are good at DevOPS tools such as puppet, fabric, this work still cost lot of time. Continue reading Integrate pyspark and sklearn with distributed parallel running on YARN

Spark read LZO file error in Zeppelin

Due to our dear stingy Party A  said they will add not any nodes to the cluster, so we must compress the data to reduce disk consumption. Actually  I like LZ4, it’s natively supported by hadoop, and the compress/decompress speed is good enough,  compress ratio is better than LZO. But, I must choose LZO finally, no reason.

Well, since we use Cloudera Manager to  install Hadoop and Spark, so it’s no error when read lzo file in command line, simply use as text file, Ex:

val data = sc.textFile("/user/dmp/miaozhen/ott/MZN_OTT_20170101131042_0000_ott.lzo")
data.take(3)

But in zeppelin, it will told me: native-lzo library not available, WTF?

Well, Zeppelin is a self-run environment, it will read its configuration only, do not read any other configs, Ex: it will not try to read /etc/spark/conf/spark-defaults.conf . So I must wrote all spark config such as you wrote them in spark-deafults.conf.

In our cluster, the Zeppelin conf looks like this:

Page 1 of 4
1 2 3 4