• Hadoop基础知识之——MapReduce

    一、背景

    在上一篇文章《Hadoop基础知识之——数据!数据!》一文中,已经交代了Hadoop 诞生的背景和意义,这一篇正式开始接触 Hadoop基本知识,当然从 MapReduce 开始讲起。

    目前 Hadoop官网上最近版本已经到 3.x 了:

    1

    二、环境准备

    工欲善其事,必先利其器。  在学习 Hadoop 的时候,我们必然会涉及到很多 Java 的代码和知识。  所以为了方便,这里先简单列举如何准备 Java 的语言环境和代码编辑器。

    2.1、安装 eclipse 编辑器

    Eclipse是由IBM开发并捐赠给开源社区的一个IDE,也是目前应用最广泛的IDE。Eclipse的特点是它本身是Java开发的,并且基于插件结构,即使是对Java开发的支持也是通过插件JDT实现的。

     

    下载过程这里不再详细赘述,免费的,很简单,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1255883818398144

    2.2、本地安装 Java

    Java SE JDK 安装也非常简单,这里不再详细赘述,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1280507291631649

    2.3、安装 Maven

    Maven就是是专门为Java项目打造的管理和构建工具,它的主要功能有:

    • 提供了一套标准化的项目结构;
    • 提供了一套标准化的构建流程(编译,测试,打包,发布……);
    • 提供了一套依赖管理机制。

    我们肯定会在 本地 Eclipse 中创建 Java 项目来测试,所以需要预先准备好 maven 环境。

    Maven 的安装也非常简单,这里不再详细赘述,参考: https://www.liaoxuefeng.com/wiki/1252599548343744/1309301146648610

    2.4、在 Eclipse 中配置使用 Maven 进行项目管理

    首先,确保本地 MAC 环境已经安装好 Java 和 Maven 环境:

    然后,在 Eclipse 中配置 Maven 相关环境:

    打开 Eclipse–>Prefrences–>Maven–>Installations , 点击右侧的Add按钮,在弹出的窗口中,选择 2.3下载解压好的文件夹,如图:

    2

    然后你就把maven装在你的Eclipse中了,但是我们知道,maven是一个方便我们管理jar包的工具,我们需要用到的jar包都是从maven的中央远程仓库里下载的,但是我们不希望每次都去本地仓库里下载,当我们下载过一次之后就可以在我们的本地仓库中导入jar包,那么,怎么连接本地仓库呢?

    新建一个文件夹,作为本地仓库,比如这里我新建了一个 maven_repo的文件夹:

    然后找到2.3 中下载解压后的maven文件夹,里面有conf子文件夹,找到settings.xml, 编辑加入一行配置:

    回到Eclipse,打开Preferences —>Maven—->User Settings:

    第一个框填的是下载 settings.xml 文件路径。

    3

    第二个框填的是刚才的本地仓库路径。

     

    三、MapReduce

    3.1、什么是 MapReduce

    宏观上来看:MapReduce 是一种数据处理的编程模型。 它利用分布式的文件系统,将任务在不同的机器上计算,然后统计聚合计算结果返回。 总体上采取了分而治之与迭代的思想,让计算能力在一定程度上水平无限扩展,从而应对复杂、庞大数据量的计算。

    MapReduce 编程模型将任务过程分为两个处理阶段: map 阶段 和 reduce 阶段。

    每阶段都以键值对作为输入和输出,类型由自己定义。

    我们只需要完成两个函数: map 函数 和 reduce 函数即可,然后 MR 框架会自动帮我们去分布式的执行这两个函数。

    3.2、以例说明

    最典型的例子就是 wordcount 词频统计了,即读取某个文本文件并计算其中单词出现的频率。输入是文本文件,输出是文本文件。 输出的内容是每行包含一个单词和它出现的次数,用制表符分隔。

    当我们安装好Hadoop 环境后,像这种 WordCount 基本测试用例,都是官方自带的:

    当然这个是源码文件,代码比较简洁,但是想要直接运行还需要编译。

    而Hadoop本身将这种 通用型测试 都打包在了 hadoop-examples.jar 包里面了,wordcount 就包含在里面。 我们可以直接运行来看这里面都包含哪些测试程序:

    如上,这里面包含了 wordcount 程序,可以直接运行。

    执行完之后,我们发现 output 如下:

    额,这里竟然生成 1301个 文件,这不是典型的小文件问题吗 ? 这里记一笔,容后再看为啥会这样。

    我们先看结果:

    如上,他确实完成了词频统计,但是这个 java 代码不是很好理解,不接地气。

    趁此机会,刚好要学习 java ,就摸一下从 本地 eclipse 开发环境编写wordcount 项目,并上传至 Kerberos 的 CDH 环境运行。

    3.3. 实践探索,自己实现 wordcount

    首先,我们来创建项目目录结构。 一个使用Maven管理的普通的Java项目,它的目录结构默认如下:

    项目的根目录 a-maven-project 是项目名,它有一个项目描述文件pom.xml,存放Java源码的目录是src/main/java,存放资源文件的目录是src/main/resources,存放测试源码的目录是src/test/java,存放测试资源的目录是src/test/resources,最后,所有编译、打包生成的文件都放在target目录里。这些就是一个Maven项目的标准目录结构。

    所有的目录结构都是约定好的标准结构。使用标准结构不需要做任何配置,Maven就可以正常使用。

    1

    如上,我们创建好目录排版后,创建了三个 代码文件,一个map函数类、一个reduce函数类、一个运行代码类。

    WordCountMapper类:

    如上,我们自定义了 名为 WordCountMapper 的类,继承自 Mapper 类。 这个 Mapper 类是一个抽象类(什么是抽象类?),它有四个形参类型,分别指定 map 函数的输入键、输入值、输出键、输出值的类型:  <LongWritable, Text, Text, LongWritable>

    Hadoop 本身提供了一套可优化网络序列化传输的基本类型,而不直接使用 Java 内嵌的类型。 这些类型都在 org.apache.hadoop.io 包中。 这里使用 LongWritable 类型(相当于java的long长整型)、Text类型(相当于java的String类型)和 IntWritable 类型(相当于 java 的Integer类型)。

    map() 方法的输入是一个键一个值。 我们在代码中先将 一行输入的 Text 转换为 Java 的String 类型,然后用 split 切割放入数组。 map() 函数还提供 Context 实例用于输出内容的写入。我们将 word 和 1 搭配写入 context 。

    WordCountReducer类:

    同样,我们自定义了 WordCountReducer类,继承自 Reduce类,它同样四个形参。reduce 函数的输入类型必须匹配map函数的输出类型。

    看到这里可能会有疑问,代码中只是将 count 进行累加了,并没有判断 key 是否相同,不是应该将 相同key 值的count 进行累加吗 ?

    其实这里我们少了一个 shuffle 环节,即清洗。  shuffle 环节处于 map 之后, reduce 之前。

    WordCount 类:

    如上,Job 对象指定作业执行规范。 我们可以用它来控制整个作业的运行。我们在代码中构造 Job 对象之后,需要指定输入和输出数据的路径。 调用 FileInputFormat 类的静态方法 addInputPath() 来定义输入数据的路径,这个路径可以是单个文件、一个目录(将目录下的所有文件当作输入)。

    调用 FileOutputFormat 类中的静态方法,setOutputPath() 来指定输出路径。 这个方法指定的是 reduce 函数输出文件的写入目录。在运行作业前,该目录是不应该存在的。

    通过 setMapperClass() 和 setReducerClass() 方法指定要用的 map 类方法类型和 reduce 类方法类型。

    setOutputKeyClass() 和 setOutputvalueClass() 方法控制 reduce 函数的输出类型,并且必须和 reduce 类产生的相匹配。  map 函数的输出类型默认和 reduce 是相同的,因此没有额外专门指定。但是如果不同,则需要用 setMapOutputKeyClass() 和  setMapOutputVlaueClass() 来指定。

    最后调用 job 中的 waitForCompletion() 方法来提交作业并等待执行完成。该方法返回 bool 值,表示 true 成功  或者 flase 失败。

    在 Hadoop 集群上运行整个作业时,要把代码打包成一个 Jar 包。

    我们知道 Maven 是通过 Pom 文件来识别并编译管理相关依赖的,我们的程序依赖 hadoop-common 和 hadoop-client 等包,可以在这里指定,我并不需要额外去下载这些包,指定了  https://repository.cloudera.com/artifactory/cloudera-repos 仓库,它会自动帮我去找相关的依赖包。

    pom.xml :

    写好pom文件后,我们就可以开始准备生成 jar 包了:  项目右键选择 debug as,然后进入如下操作:

    Goals 填写: clean compile package

    5

    完成后将自动生成 target 目录:

    7

    我们看到 target 目录下已经生成我们想要的 WordCount-1.0-SNAPSHOT.jar 包了,我们将这个包传到客户端服务器。

    jar 包需要放在本地客户端服务器:

    从上面的结果信息中,我们可以看到一些有价值的东西:

    这个作业的标识 是 job_1602495843334_9212309,执行了 2个 map 和 1300 个 reduce 任务。Counters 部分显示了一些统计信息,我们可以看到:

    10 个 map 输入记录(两个文件一共有10行),产生 46 个 map 输出记录。随后,这 46 个记录,分成 38 组 reduce 输入记录,产生 38 个reduce 输出记录。

    我们看到输出目录下1300 个 part-r-xxxx 文件大部分都是空的(1300 个reduce),大小为0,而不为空的刚好有 38-1 个:


    当运行完成后,可以看到结果如下:

    3.3 概念扩展 与 加深理解

    通过上面的实践,我们已经和可以自己写一个最简单的 Maven 项目,并传到集群,去执行获取结果。  但是我们对集群上任务本身的运行原理还不够了解,是时候停下来扩展一下概念,加深理解。

    3.3.1 MR 的任务数据流

    我们提交一个 MR 作业(Job 或者 Application) 到集群上之后,Hadoop 将作业分成了 两类 任务 Map 和 Reduce。 这些任务运行在集群的节点上,通过 Yarn 去调度。 如果一个任务失败,它将在另一个节点上自动重新调度运行。

    在这个过程中,Hadoop 将 MR 的输入数据划分成 等长的小数据块,称为输入分片(input split)。Hadoop 为每个分片构建一个 map 任务,并由该任务来运行用户自定义的 map 函数从而处理分片中的每条记录。

    那么可想而知,理论上分片越多,map 越多,并发越高就会更快。 但是另一方面,如果分片切的越小,那么管理分片的总时间和构建map的任务的总时间将决定做作业的整个执行时间。

    • 那么分片取多少最合适呢? 对于大多数作业来说,合理的分片大小趋向于 HDFS 的一个块的大小,默认为 128MB。

    我们知道,Hadoop 在存储有输入数据(HDFS中的数据)的本节点上运行 map 任务,可以获得最佳性能,因为它无需使用宝贵的集群带宽资源。 这就是所谓的 “数据本地化优化”。   在实际的集群运行过程中,有时候对一个map任务的 输入分片来说,存储该分片的 HDFS Block 的所有Daatanode 节点可能刚好被正在运行的其他任务占满了。  所以这时候就要从某一个数据块所在的机架中的一个节点上寻找一个空闲的 map 槽来运行该map任务(甚至在某些非常特殊的情况下,会使用其他机架的节点来运行该map任务,这将导致机架之间的网络传输)。

    假如当一个 分片跨越两个数据块,那么对于任何一个 HDFS 节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务运行的节点。 与本 节点运行相比,这种方法显然效率更低。

    • map 任务的输出写入本地磁盘,而不是 HDFS,为什么呢 ? 

    因为map任务的输出是中间结果,该中间结果由 reduce 任务处理后才产生最终输出结果,而且一旦作业完成,map 任务的输出结果就可以删除。因此,如果把它存储在 HDFS中并且实现三副本,难免有些小题大做。 如果运行 map 任务的节点在将 map中间结果传送到 reduce 之前失败,Hadoop 将会在另一个节点上重新运行这个map任务以再次构建map 中间结果。

    • reduce 任务的输出写入 HDFS,又是为什么呢 ? 

    reduce 任务并不具备数据本地化的优势,单个reduce 任务的输入通常来自于多个 map 任务的输出。 因此,排过序的map输出需要通过网络传输发送到运行 reduce 任务的节点。 数据在reduce 端合并,然后由用户定义的 reduce 函数处理。

    对于 reduce 输出的每个 HDFS 块,第一个副本存储在本地节点上,其他副本处于可靠性考虑存储在其他机架的节点中。

    因此,将reduce 的输出写入hdfs确实需要占据一定的集群带宽,但这与正常的 hdfs 管线写入的消耗一样,可以接受。

    • map 任务的数量往往是由输入数据的大小 输入分片决定的,那 reduce 的数量呢 ? 

    reduce 任务的数量并非由输入数据的大小决定,是可以独立指定的。

    如果有好多个 reduce 任务,每个map 任务就会针对输出进行分区(partition),为每个 reduce 任务建一个分区。 每个分区有许多键值,但相同键对应的键值对记录都在同一分区中。

    分区可由用户定义的分区函数控制,但通常用默认的 parttitioner 通过哈希函数来分区,很高效。

    多个map,一个reduce 图示(图片摘自网上):

    1

    多个map,多个reduce 图示(图片摘自网上):

    2

    如上,从这里我们可以更好的理解 shuffle 的作用,顾名思义,洗牌 就是 合并归类的作用。

    当然,当数据处理可以完全并行,不需要 shuffle时,可能会出现 无 reduce 任务的情况。 比如 我们用 distcp 进行数据迁移拷贝时。

     

    3.4 用 Python 实现 WordCount
    3.4.1 Hadoop Streaming

    尽管 Hadoop 框架是用Java编写的,但是跑 Hadoop 的程序不一定用Java,也可以用其他语言开发,比如 Python 或 c++等。 Hadoop 提供了 MapReduce 的 API,允许你使用 非 Java 的其他语言来写自己的map 和 reduce 函数。

    Hadoop Streaming 使用 Unix 标准流作为 Hadoop 和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入/输出 来写 MapReduce 程序。

    Streaming 天生适合用于文本处理。map 的输入数据通过标准输入流传递给 map 函数,并且是 一行一行的传输,最后将结果行写到标准输出。

    map 输出的键值对是以制表符分割的行,reduce 函数的输入格式与之相同并通过标准输入流进行传输。 reduce 函数从标准输入流中读取输入行,该输入已由 Hadoop 框架根据键值排过序,最后将结果写入标准输出。

    3.4.2 学习 官网的Python 实现 WordCount

    我们先来看一下官网中的 Python跑MR案例

    2

    如上,按照这个文档说明的话,貌似必须使用Jython将Python代码转换成Java jar文件。显然,这不是很方便,如果我们用Jython没有提供的Python特性,甚至可能会产生问题。 Hadoop 自带的Python Wordcount 目录默认在:

    但这个代码感觉不是很友好,就是一点都不 Python:

    这里面采用了大量 org.apahce.hadoop 的包与类,乍一看不好理解,我们用自己的方式去实现一个:

    3.4.3 自己构造 Python WordCount 程序

    根据 3.4.1 我们知道 hadoop streaming 可以很好的处理并被调用,我们的程序目的与标准:

    与传统的 wordcount 一样,我们为单词计数,即读取某个文本文件并计算其中单词出现的频率。输入是文本文件,输出是文本文件。 输出的内容是每行包含一个单词和它出现的次数,用制表符分隔。

    我们先写一个 mapper 脚本,它将从STDIN读取数据,将其拆分为单词,并输出一个行列表,将单词与其计数映射到STDOUT。

    但是,Map脚本不会计算单词出现次数的和。它将立即输出(word ,1)这样的元组。

    然后,我们实现 reducer函数, 它将从STDIN读取mapper.py的结果,并将出现的每个单词和最终的计数相加,然后将其结果输出到STDOUT。

    如上,需要注意的是,这个代码中,要求 reduce 的输入是按照 word 排序过的,即相同的 word 行是挨着进来的,否则会出错。

    我们可以测试一下:

    mapper 将一行输入映射成了 word  1 这样的元组。

    如果我们直接接上 reduce 则会出错,如下,wang 这个单词被分开统计了:

    需要,在中间加一个 shuffle 环节,我们用 sort 来模拟:

    本次测试通过后,我们在 hadoop 集群上去测试运行我们的代码。

    我们先搞一些测试数据来,从这三个地方分别下载三份测试数据:

    将此三个文件移动到 hdfs 集群的目录下:

    然后我们利用 hadoop-streaming 包来执行:

    输出结果如下:

     

     

     

    参考

    Writing An Hadoop MapReduce Program In Python

     

     

     

     

     

  • CDH集群监控方案架构与实现(一)基础监控

    背景

    大数据集群的维护本身是充满挑战的,庞大的数据量、复杂的运算逻辑、关联的大数据组件、一个企业级的集群往往有PB 级的数据、成百上千和各式运算任务 在一套集群上运行。  所以运维如果想不是很被动的话,就必须做好各式监控,预防风险、发现风险、分析问题、进而针对性的处理问题。

    因为大数据集群一旦出问题,往往就已经很难处理了 ,所以监控要前置,有趋势预测、有风险评估、才行。

    针对这些,我们要从多个维度入手对大数据集群进行基础监控:

    如下,我们对整个集群的基础监控主要分为三大维度:

    1. 底层服务监控:服务器底层的CPU、内存、IO、磁盘 等;
    2. Hadoop性能监控:各式组件各维度性能监控
    3. Runtime 监控:自定义脚本轮训服务;

    11

    底层服务监控借助 Zabbix,这个很简单,不再详赘了。

    这里主要说下第二个维度: 组件性能监控。

    CDH 组件性能监控

    对 CDH 组件的性能监控,如上图,我们借助 Prometheus + AlertManager + Grafana 的架构方式。

    • 写一个 exporter ,从 Cloudera Manager 的 API 中取出各式组件的 Metrics 。
    • 借助 Alertmanager 对需要的 metrics 进行告警。
    • 写一个 钉钉告警API 供 Alertmanager 调用。
    • 将 Prometheus data source 接入 Grafana 做报表展示。

    这里具体的代码与实现细节,不展开了,否则太多,主要是将大概的思路和实现方式以及官网地址交待清楚:

    一、明确要获取的 Metrics

    首先我们要明确有哪些Metrics 可以供我们获取,或者说我们从哪里挑到 可以获取的 Metrics 列表。 这一步我们从 Cloudera Manager 官网可以看到:

    https://docs.cloudera.com/documentation/enterprise/5-15-x/topics/cm_metrics.html

    b

    这里有所有的 CM 组件相关 Metrics,我们随便点一个进去,可以看到 组件的 Metrics 详细信息,有描述,也有单位等:

    1

    如果我们对这些 Metrics 还不是很懂的话,其实可以借助 Cloudera Manager 的图表生成器:

    1

    如上,其实这些 Metric 从 CM 的图标生成器也可以查到,并且有自动匹配功能和中文释义,非常方便。

    二、Cloudera Manager 的 API 接口交互

    与 Cloudera Manager 的接口交互当然也是看官方文档:http://cloudera.github.io/cm_api/apidocs/v19/

    1

    API 对应的官方文档里面有大量的接口调用方式,包括一些动作、状态、Metrics 获取等。这里我们暂时只关注 Metrics 获取,我们会发现,官方文档中所有的 Metrics 获取都建议我们走时序型接口:

    2

    如上,所有关于角色的 Metrics 建议我们走 时序型 API 去获取:

    3

    最简单的一个 Python 代码调用示例如下:

    得到的结果如下:

    如上,就可以获取到对应 Metrics 相关值 。

    三、自己写 Exporter 从 Cloudera Manager 获取 Metrics 数据

    这里用 Python 写一个 Exporter ,获取数据,因为涉及到的 Metrics 很多,所以采取多线程并发的方式去获取。

    这里主要借助 Python的这个库:https://github.com/prometheus/client_python#instrumenting

    1

    这个 Python 包里有大量 prometheus exporter client 方式,可以参考使用。

    这块的代码就不再这里详细赘述了,总体来说,开 30个线程并发,取 几百个 Item,最终入Prometheus 的时候大概 几万个 Metric ,总共需要约 8 s 多。

    四、搭建配置 AlertManager ,并实现自定义钉钉告警接口

    数据入到 Prometheus 之后,我们当然要去设置针对一些Metrics 的告警阈值,这里借助和Prometheus 的告警插件 AlertManager,AlertManager 的安装很简单,这里不再详赘,之前的 Prometheus初探 一文中已经说过,这里主要说下配置相关:

    关于相关配置可以参考官方文档:https://prometheus.io/docs/alerting/configuration/

    如上,我们在 AlertManager 中加了这些配置,它的配置总体来说主要是包括 routes 告警转发路由,receiver 告警接收器,inhibit_rule 告警收敛规则 等。

    如上,我们默认将报警都发给了我们自己实现的一个  http://10.25.x.x:5000/ding_alert  webhook,其实是用 Python 写的一个钉钉告警接收器。

    五、Flask 钉钉告警网关

    正如上面所说,我们自己实现一个简单的钉钉告警接收器,用 Flask 实现,可以让 AlertManager 通过 5000 端口调用发送钉钉告警,这里主要是要注意处理发送与接收格式:

    相关代码简要如下:

    六、Prometheus 中配置 AlertManager ,并添加配置告警项

    前面的准备好之后,我们还需要配置 Prometheus 告诉它哪些需要告警,并且把AlertManager 加进去:

    先是 Prometheus 的主配置文件:

    监控项我们放在单独的 rules 目录的文件下:

    七、Grafana 接入 Prometheus

    其实前六步昨晚之后,数据就已经入到我们的 Prometheus 了,可以在Prometheus 上查到:

    1

    如上图,我们点击 Graph ,然后输入 Metrics 名称或正则匹配就可以查到,点击 图片中间的 Graph还可以看到图形。

    当然,我们配置的 Alert 也是可以在这里看到的:

    2

    甚至我们可以像 Zabbix一样看到告警相关的 Reports:

    1

    当然最重要的是也能收到相关的钉钉告警:

    4

    但是单纯的收到告警信息不行,我们需要一个炫酷的,有维度的监控展示。  Grafana 是很好的选择,我们只需要将 Prometheus 作为 Datasource 纳入 Grafana,即可构建丰富的报表展示:

    4

     

    总结

    至此,我们就完成了我们的基础监控部分。 可以随着各式各样的需求逐步完善收集 Metrics 即可。 既有告警信息、又有丰富的图形展示,而且很灵活,方便调配。

    但这只是基础监控,距离我们掌控运维好集群还有很大的距离,后续还要继续补充几个维度:

    • 元数据监控。
    • 业务运行日志与任务状态监控。
    • 定时统计报表发送等。

    其实每个维度想要做好都很难,会在后续的时间过程中持续更新此系列博客。

     

     

  • Kudu 升级引发的磁盘扩容将MBR分区修改为GPT

    背景

    一夜之间,Kudu 集群的存储达到了瓶颈,6台 Kudu 服务器的数据存储分区基本都快满了:

    10

    所以需要扩容Kudu存储的这个磁盘,但没想到是个苦逼的过程,计划中的步骤是这样的:

    1. 停止 Kudu 服务,关掉 ECS 服务器。
    2. 阿里云高效云盘做快照,以防万一。
    3. 阿里云高效云盘扩容至 6T 。
    4. 重新启动 ECS 服务器,fdisk 系列命令重新生成分区,并使用 xfs_growfs 命令提升磁盘分区空间。

    但是在真正做的过程中,发现一个问题:

    目前这个分区是 2T 的,并且用的是 MBR 分区方式,MBR 分区最大就支持2T,GPT分区方式才支持更大的分区。

    经过思考得出如下两种解决方案,过程记录如下:

    方案一(已放弃)
    1. 新挂载一个GPT格式的磁盘分区
    2. 然后将数据目录拷贝到新的分区
    3. 修改kudu的目录参数,在新的分区启动存储

    此方式最大的问题就是在实际操作的时候,发现拷贝慢到超乎我的想象,试了好多办法:cp、mv、dd 都不行,2T 的数据拷完大概需要十几个小时,我分析应该是小文件太多的缘故,导致不管用哪种方式,都需要很长时间,测试了半天以后,最终放弃了。

    方案二: 修改已有的 MBR 分区格式为 GPT

    首先不得不说这种操作方式,真的很危险,不到万不得已不要使用,虽然我这次成功了。但是在阿里云的官方文途径是获取不到操作步骤的,它不建议这么做,自然不会给你提供文档说明,它只是建议我们用方案一的:

    11

    但是方案一对我们来说成本太高了,我们继续探索修改分区方式。

    首先,在阿里云扩容此分区到 6T,过程可以参考:阿里云磁盘扩容概述

    然后,按照网上这篇文档进行磁盘分区方式修改和扩容:Linux下数据无损动态修改MBR分区表格式为GPT

     

    扩容完毕。

     

     

     

  • Flask explorer FAQ

    背景

    此博文意在记录Flask探索过程中遇到的一些问题,及解决。

    一、Flask_sqlalchemy 报错ModuleNotFoundError: No module named ‘MySQLdb’

    原因:

    在 python3  中取消了MySQLdb的数据库连接驱动方式,转而替换成了   pymysql 。而 python2 连接数据驱动可以用  pymysql、MySQLdb 两种。sqlalchemy 默认使用 MySQLdb 连接数据库,所以如果你用的是Python3 作为Flask 项目的代码版本,则会遇到这个问题。

    解决:

    二、abort 函数 触发 error_handle 时报参数错误

    问题现象:

    我在API接口的 errors.py 模块中定义了蓝图对应的错误handle,如下:

    所以只要在 view.py 接口函数中,用abort函数则可以抛出对应的错误信息:

    但是运行时却报如下错误:

    TypeError: Object of type BadRequest is not JSON serializable

    问题的原因:

    经过排查,发现是 message 参数类型的缘故,无法被 jsonify 转换为 json:

    所以 message 是flask 异常类的对象,不能被直接转换为 json 格式中的一环。

    问题解决:

    将此对象进行类型强转,转换为 str 类型,如下:

    然后再次请求,结果如下:

     

     

     

  • SQL优化案例(二)高并发下的简单SQL占尽CPU资源

    背景

    线上数据库突然从某一刻开始CPU资源告警,如下:

    a

    和业务核对上线发布情况和系统调整后,发现可能不是由上线代码改动引起的。

    登录线上服务器后,执行  show processlist 发现比平时的连接数要高很多,而且很多都在执行这一条SQL:

    a

    开始其实我并没有怀疑这条SQL,因为它确实很简单,没有 order by ,也没有 group by。 所以第一时间是怀疑有大事务或者死锁发生,但是经过查看发现并不是:

    紧接着我怀疑是不是有其他的大SQL 与 这个高并发 SQL 有冲突,于是翻找了一下,发现是有两个SQL比较慢,并且耗费资源,但是量不大,应该难以这么快降数据库拖垮。

    于是还是将目标瞄准了这个简单SQL,下面是它的一些基本信息:

    表结构与索引如下:

    表行数如下:

    SQL 的执行计划如下:

    在主库上执行这条SQL只需要0.22s(我们的主库是读写分离,所以当时没有问题,可以用来测试):

    profile 分析结果如下:

    解决

    其实也是因为运气好,出问题时RDS实例没有各种业务模块的SQL在并发进行,所以我在管理控制台,将所有的读请求量转到一台空闲的备份RDS上。 发现这台新的RDS又立马被这条SQL夯满了,并且因为没有CPU资源所以很多查询都被阻塞的很慢 。

    值得怀疑的有两点:

    1、就是profile 中 Sending data 环节的CPU资源占用还是有点的多的,虽然是0.17 左右,但是成百上千个同时过来的话也不算小。

    2、就是Explain结果中的 rows 有18w行,而 filtered 字段值只有1.11,也就意味着这条SQL每执行一次要扫描 18w行,但是还要进行过滤,过滤的比率高达 99% 左右,才返回。

    从上面的分析结果来看,所以做出如下修改:

    然后再看执行计划和profile:

    如上,加的索引立马被用到了,扫描行数变成了 2406 ,filtered 变成了 5%,再看下Profile:

    如上,变化果然很明显,Sending Data 环节的CPU消耗立马减少了好多。

    然后加了此索引后,线上也立马恢复正常了。

    总结

    有时候一个看似简单的SQL,在一定的量级下也会对数据库造成毁灭性的打击。 这件事又一次侧面强调了索引的重要性。

    另外再出现类似的情况时,可以结合 Explain 和 Profile 来综合分析。

     

     

     

  • CDH 添加Spark2服务

    背景

    我们用的 Cloudera Manager 5.15.1 默认自带的Spark版本是 1.6.0 ,这个版本的 Spark 略有点老,用起来不是很方便。所以其实想在集群中并行Spark1 和 Spark2 来跑。

    在CDH中,Spark2 和之前我们之前讲的Kafka一样是单独作为parcel分发安装包来支持的。

    打开官网的Document文档首页,不用选择版本,在下面就可以看到Spark2的Quick Links:

    a

    1、CDS Powered by Apache Spark

    CDH 里的Spark2 是托管于 CDS中的一个附加服务,此组件 通常支持在 CDH 5.9 或更高版本中。

    Spark2 在 CDS 2.0   release1 发行版中存在一些Hive兼容性问题,主要影响 CDH 5.10.1及以上,CDH 5.9.2及以上,CDH 5.8.5及以上,CDH 5.7.6及以上。 如果正在使用这些CDH版本之一,则必须升级到CDS 2.0  release2 或更高的包,以避免在使用Hive功能时发生 Spark2 作业失败。

    在CDH上使用Spark2 时,并不是所有的特性、组件、建议等都与官方原生的Spark完全切合。

    Spark 2 由这几个相关项目组成:

    • Spark SQL: Module for working with structured data. Allows you to seamlessly mix SQL queries with Spark programs.
    • Spark Streaming: API that allows you to build scalable fault-tolerant streaming applications.
    • MLlib: API that implements common machine learning algorithms.

    Cloudera 提供的版本主要有 Spark: 1.6, 2.0, 2.1, 2.2, 2.3, 2.4。

    在 CDH5 上,Spark 1.6 服务可以与Spark 2服务共存。这两个服务的配置并不冲突,并且两个服务使都可以基于 Yarn 来运行。Spark History 服务器的端口是18088(用于Spark 1.6)和 18089 (用于Spark 2)。

    2. Install Requirement (安装需要)
    • Spark 2 无法在 Scala 2.10.上运行,需要 安装配置 Scala 2.11
    • Python 版本:
      • Python 2.7 or higher, when using Python 2.
      • Python 3.4 or higher, when using Python 3. (CDS 2.0 only supports Python 3.4 and 3.5; CDS 2.1 and higher include support for Python 3.6 and higher.)
    • Java JDK8,如果你使用了CSD2.2 或更高版本,必须从集群所有节点移除JDK7 。
    • 虽然可以使 Spark1 和 Spark2 并行,但是不能同时安装使用配置多个 Spark2 版本。

    二、安装

    2.1 CSD包下载配置

    首先在如下地址中,下载CSD 对应的Spark2 包: http://archive.cloudera.com/spark2/csd/

    a

    将下载的 CSD包放到到机器的/opt/cloudera/csd目录下,并修改权限:

     

    2.2 parcel包下载配置

    首先在如下地址中,下载parcel包:http://archive.cloudera.com/spark2/parcels/

    a

    将下载的包放到 parcel-repo 目录下,并修改权限:

    注1:下载parcel包时要下载对应的版本,这里系统使centOS 7,所以下载el7

    注2:CSD和 parcel 包都有cloudera1 和 cloudera2之分,因此在下载CSD和parcel包时两者版本必须一致。

    注3:如果 /opt/cloudera/parcel-repo目录下有其他的安装包,不用删除 ,但是如果本目录下有其他的重名文件比如manifest.json文件,把它重命名备份掉。然后把新的文件放在这里。

    2.3 CM 中配置 Parcel 生效

    首先,重启 CM :

    然后在CM的控制界面 hosts–>parcels 就可以看到对应的Parcel中多了 Spark2 ,并且是已下载待分配状态:

    a

    然后我们点击分配,这个分配的过程其实是把这些包从 CM 拷贝到所有的其他节点(包括Gateway),我们来对比一下:

    分配完成后,会发现它处于待激活状态,我们来看一下:

    a

    我们点击激活,我们发现激活完毕后,在 parcels 目录下多了一个软链:

    2.4 服务添加与部署

    这时候就可以在CM管理界面选择Spark2 服务去添加了:

    a

    分配角色:

    a

    遇到的问题1:

    安装好之后,测试使用时发现 Spark2-shell ,Pyspark2 的使用都没有问题,但是Spark2-submit 在跑任务时会出现问题:

    当我安装的Spark2.4版本时,发现运行Spark2-submit 测试任务报如下错误:

    如上,它一直报这个错误,然后任务无法正常运行下去。

    经过大量测试,无法定位解决,最后把 Spark2.4 换成 Spark2.3,发现上面那个报错没了,但是有了新的问题。

    在Spark2.3 下,当我指定 deploy-mode 方式为 Yarn-cluster 模式时,任务可以顺利正常运行,当我指定 deploy-mode 为 Client 模式时,任务会出现如下现象: