人生苦短,几十岁了还不行乐?

0%

Hadoop与HDFS

组员:
2031570 谈瑞
2031568 薛锦伟

分布式计算期末项目简单记录,水一水~

一 系统架构

1 Hadoop基本介绍

Hadoop是一个高可靠的(reliable),规模可扩展的(scalable),分布式(distributed computing)的开源软件框架。它使我们能用一种简单的编程模型来处理存储于集群上的大数据集。

Hadoop是Apache基金会的一个开源项目,是一个提供了分布式存储和分布式计算功能的基础架构平台。可以应用于企业中的数据存储,日志分析,商业智能,数据挖掘等。其为应用提供可靠性和数据移动。它实现了名为 MapReduce 的编程范式:应用程序被分割成许多小部分,而每个部分都能在集群中的任意节点上执行或重新执行。此外,Hadoop 还提供了分布式文件系统HDFS,用以存储所有计算节点的数据,这为整个集群带来了非常高的带宽。MapReduce 和分布式文件系统的设计,使得整个框架能够自动处理节点故障。它使应用程序与成千上万的独立计算的电脑和 PB 级的数据。

由于Hadoop的架构与本次项目需求较吻合,因此我们直接使用Hadoop的HDFS和MapReduce框架来进行此次项目所需要的分布式系统的实现。

2 集群架构图

image-20181228192856958

3 集群机器

主机名 内存 IP 软件 运行进程
node0 512MB 192.168.137.200 ZooKeeper QuorumPeerMain
node1 512MB 192.168.137.201 ZooKeeper QuorumPeerMain
node2 512MB 192.168.137.202 ZooKeeper QuorumPeerMain
master 2GB 192.168.137.100 Hadoop,Hive,MySql JournalNode,NameNode,ResourceManager,
DFSZKFailoverController,HiveServer2,MySql
master1 2GB 192.168.137.10 Hadoop,Hive JournalNode,NameNode,ResourceManager,
DFSZKFailoverController,HiveServer2
slave1 1GB 192.168.137.101 Hadoop JournalNode,DataNode,NodeManager
slave2 1GB 192.168.137.102 Hadoop DataNode,NodeManager
slave3 1GB 192.168.137.103 Hadoop DataNode,NodeManager
host 8GB 192.168.137.1 应用服务器

4 集群搭建

4.1 简介

集群使用VirtualBox创建了8台虚拟机模拟真实环境中的分布式集群,虚拟机全部使用CentOS7-x86_64系统,其中3台ZooKeeper集群,5台Hadoop集群(2台Master,3台Slave),Windows本机作为应用程序服务器用于连接此集群。

4.2 虚拟机创建

此集群中机器的系统基本配置几乎是一样的,只是在后期所担任的角色不同,因此这里我创建了一台虚拟机,而后将环境配好后复制了7台,而后针对其所担任角色进行针对性修改。

首先创建了一台裸机,要解决的第一个问题是虚拟机与主机的网络通信,这里我采用VirtualBox中的Host-Only连接方式,以保证虚拟机与主机之间正常的网络通信,同时需要在主机上共享网络,以保证虚拟机同时还能访问互联网。

在主机网络设置中共享网络:

image-20181228201105314

在VirtualBox中执行以下操作设置主机连接方式:

image-20181228200129326

在虚拟机终端执行以下操作:

# 修改虚拟机的IP、子网掩码
vim /etc/sysconfig/network-scripts/ifcfg-enp0s3
# 修改为以下内容
TYPE=Ethernet
IPADDR=192.168.137.100
NETMASK=255.255.255.0
# 保存退出
# 修改网关地址
vim /etc/sysconfig/network
# 修改为以下内容
NETWORKING=yes
GATEWAY=192.168.137.1
# 保存退出
# 修改主机名为master,后续过程中访问本机只需要主机名而不用敲IP
hostnamectl set-hostname master
# 关闭并停用防火墙,由于这里使用的是局域网,因此无需太多考虑网络安全
systemctl stop firewalld
systemctl disable firewalld
# 重启网络服务
systemctl restart network
# 尝试从虚拟机ping网关以及从主机ping虚拟机hostname或者ip,若都能ping通说明网络配置成功
ping 192.168.137.1
# 从虚拟机ping外网查看是否可以连接互联网,这里测试百度IP:61.135.169.105
ping 61.135.169.105
# 修改hosts文件,添加局域网中其他主机的主机名与ip的映射
vim /etc/hosts
# 修改为以下内容
192.168.137.100 master
192.168.137.10 master1
192.168.137.101 slave1
192.168.137.102 slave2
192.168.137.103 slave3
192.168.137.200 node0
192.168.137.201 node1
192.168.137.202 node2
0.0.0.0 localhost
# 保存退出

至此,虚拟机网络配置已完成,下面安装Hadoop 2.9.2及Hive 2.3.4(下载、解压步骤省略),并执行一些准备工作:

# 首先添加Hadoop和Hive相关环境变量
vim /etc/profile
# 添加下列内容
export HADOOP_MAPRED_HOME=/usr/local/hadoop
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HIVE_HOME=/usr/local/hive
export PATH=$HIVE_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
# 保存退出,并使环境变量生效
source /etc/profile

Hadoop和Hive的配置需放到各台虚拟机上分别执行,因为不同虚拟机所需要的配置不同。

4.3 虚拟机复制

上述步骤已经创建好了一个虚拟机,下面需要复制出7个,并对每台机器针对性的进行一些修改。

4.3.1 网络配置

对于每台虚拟机需要执行以下几个步骤以保证9台机器之间形成一个网络:

  • 修改IP

    vim /etc/sysconfig/network-scripts/ifcfg-enp0s3

    针对集群机器中定义的IP将IPADDR项修改为对应的IP

  • 修改主机名

    hostnamectl set-hostname XXX

    针对集群机器中定义的主机名执行以上命令修改为特定的主机名

  • 重启网络服务

    systemctl restart network
  • ping各个节点测试是否成功

    ping master
    ping master1
    ping slave1
    ping slave2
    ping slave3
    ping node0
    ping node1
    ping node2
    ping 192.168.137.1
    ping 61.135.169.105
4.3.2 Hadoop配置
  1. 修改core-site.xml
vim $HADOOP_HOME/etc/hadoop/core-site.xml
  • 作用:Hadoop集群的核心配置文件

  • 需要修改的机器:master、master1、slave1、slave2、slave3

  • 内容:

    <configuration>
    <property>
    <name>fs.defaultFS</name>
    <value>hdfs://ns</value>
    </property>
    <property>
    <name>hadoop.tmp.dir</name>
    <value>/var/hadoop</value>
    </property>
    <property>
    <name>dfs.permissions.enabled</name>
    <value>true</value>
    </property>
    <!-- 指定zookeeper地址 -->
    <property>
    <name>ha.zookeeper.quorum</name>
    <value>node0:2181,node1:2182,node2:2181</value>
    </property>
    <!-- 允许访问此hdfs的主机和群组,此处设置为任意 -->
    <property>
    <name>hadoop.proxyuser.root.hosts</name>
    <value>*</value>
    </property>
    <property>
    <name>hadoop.proxyuser.root.groups</name>
    <value>*</value>
    </property>
    </configuration>
  1. 修改hdfs-site.xml
vim $HADOOP_HOME/etc/hadoop/hdfs-site.xml
  • 作用:hdfs集群配置文件

  • 需要修改的机器:master、master1、slave1、slave2、slave3

  • 内容:

    <configuration>
    <!-- 指定dfs文件存储位置 -->
    <property>
    <name>dfs.data.dir</name>
    <value>/var/hadoop-data</value>
    </property>
    <!-- 指定文件备份份数 -->
    <property>
    <name>dfs.replication</name>
    <value>2</value>
    </property>
    <!-- 指定机器运行情况检查时间间隔 -->
    <property>
    <name>dfs.namenode.heartbead.recheck-interval</name>
    <value>3000000ms</value>
    </property>
    <!-- 指定hdfs的nameservice为ns,和core-site.xml保持一致 -->
    <property>
    <name>dfs.nameservices</name>
    <value>ns</value>
    </property>
    <!-- NS下面的NameNode -->
    <property>
    <name>dfs.ha.namenodes.ns</name>
    <value>nn1,nn2</value>
    </property>
    <!-- nn1的RPC通信地址 -->
    <property>
    <name>dfs.namenode.rpc-address.ns.nn1</name>
    <value>master:9000</value>
    </property>
    <!-- nn1的http通信地址 -->
    <property>
    <name>dfs.namenode.http-address.ns.nn1</name>
    <value>master:50070</value>
    </property>
    <!-- nn2的RPC通信地址 -->
    <property>
    <name>dfs.namenode.rpc-address.ns.nn2</name>
    <value>master1:9000</value>
    </property>
    <!-- nn2的http通信地址 -->
    <property>
    <name>dfs.namenode.http-address.ns.nn2</name>
    <value>master1:50070</value>
    </property>
    <!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
    <property>
    <name>dfs.namenode.shared.edits.dir</name>
    <value>qjournal://master:8485;master1:8485;slave1:8485/ns</value>
    </property>
    <!-- 指定JournalNode在本地磁盘存放数据的位置 -->
    <property>
    <name>dfs.journalnode.edits.dir</name>
    <value>/usr/local/hadoop/journaldata</value>
    </property>
    <!-- 开启机器故障自动切换主从机器 -->
    <property>
    <name>dfs.ha.automatic-failover.enabled</name>
    <value>true</value>
    </property>
    <!-- 指定failover切换的方法(java类的名称) -->
    <property>
    <name>dfs.client.failover.proxy.provider.ns</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <!-- 指定failover切换的方法,这里使用ssh通信方式交换 -->
    <property>
    <name>dfs.ha.fencing.methods</name>
    <value>
    sshfence
    shell(/bin/true)
    </value>
    </property>
    <!-- ssh切换方法需要指定私钥文件位置 -->
    <property>
    <name>dfs.ha.fencing.ssh.private-key-files</name>
    <value>/root/.ssh/id_rsa</value>
    </property>
    </configuration>
  • 注意

    • 假设当备份份数为2时,现在有三台DataNode机器,文件被分为2个block,block1位于1和2上,block2位于1和3上,这是若机器3宕机了,hdfs会在设定的dfs.namenode.heartbead.recheck-interval时间间隔内检查出机器3(在此时间间隔内可能会出现文件数量紊乱的现象),此时block2数量变为1,hdfs会自动将1中的block2复制一份到另外一台可用机器上(此处为2)。当机器3恢复运行时,3中备份的block2会自动删除。
    • 当使用jdbc访问hdfs时,不会使用hdfs-site.xml中的dfs.replication,而会默认使用3,可在java的configuration中配置为指定值
  1. 修改slaves文件
vim $HADOOP_HOME/etc/hadoop/slaves
  • 作用:为各个master指定为其工作的slave

  • 需要修改的机器:master、master1

  • 内容

    slave1
    slave2
    slave3
  1. 修改yarn-site.xml
vim $HADOOP_HOME/etc/hadoop/yarn-site.xml
  • 作用:yarn集群的核心配置文件

  • 需要修改的机器:master、master1、slave1、slave2、slave3

  • 内容:

    <configuration>
    <!-- 启用yarn集群的高可用机制 -->
    <property>
    <name>yarn.resourcemanager.ha.enabled</name>
    <value>true</value>
    </property>
    <!-- 指定ResourceManager集群id,可为任意字串 -->
    <property>
    <name>yarn.resourcemanager.cluster-id</name>
    <value>yrc</value>
    </property>
    <!-- 指定两台ResourceManager的名称 -->
    <property>
    <name>yarn.resourcemanager.ha.rm-ids</name>
    <value>rm1,rm2</value>
    </property>
    <!-- 指定两台ResourceManager的主机名 -->
    <property>
    <name>yarn.resourcemanager.hostname.rm1</name>
    <value>master</value>
    </property>
    <property>
    <name>yarn.resourcemanager.hostname.rm2</name>
    <value>master1</value>
    </property>
    <!-- 指定两台ResourceManager的web端口,正常情况为8088 -->
    <property>
    <name>yarn.resourcemanager.webapp.address.rm1</name>
    <value>master:8088</value>
    </property>
    <property>
    <name>yarn.resourcemanager.webapp.address.rm2</name>
    <value>master1:8088</value>
    </property>
    <!-- 指定管理集群的Zookeeper集群的地址及对应端口 -->
    <property>
    <name>yarn.resourcemanager.zk-address</name>
    <value>node0:2181,node1:2181,node2:2181</value>
    </property>
    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
    </property>
    <property>
    <name>yarn.nodemanager.auxservices.mapreduce.shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>
    <!-- 指定jar包路径 -->
    <property>
    <name>yarn.application.classpath</name>
    <value>/usr/local/hadoop/etc/hadoop:/usr/local/hadoop/share/hadoop/common/lib/*:/usr/local/hadoop/share/hadoop/common/*:/usr/local/hadoop/share/hadoop/hdfs:/usr/local/hadoop/share/hadoop/hdfs/lib/*:/usr/local/hadoop/share/hadoop/hdfs/*:/usr/local/hadoop/share/hadoop/yarn:/usr/local/hadoop/share/hadoop/yarn/lib/*:/usr/local/hadoop/share/hadoop/yarn/*:/usr/local/hadoop/share/hadoop/mapreduce/lib/*:/usr/local/hadoop/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar</value>
    </property>
    </configuration>
  1. 修改mapred-site.xml
  • 作用:指定MapReduce操作的基本属性

  • 需要修改的机器:master、master1

  • 内容

    <configuration>
    <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
    </property>
    <property>
    <name>mapreduce.application.classpath</name>
    <value>/usr/local/hadoop/etc/hadoop:/usr/local/hadoop/share/hadoop/common/lib/*:/usr/local/hadoop/share/hadoop/common/*:/usr/local/hadoop/share/hadoop/hdfs:/usr/local/hadoop/share/hadoop/hdfs/lib/*:/usr/local/hadoop/share/hadoop/hdfs/*:/usr/local/hadoop/share/hadoop/yarn:/usr/local/hadoop/share/hadoop/yarn/lib/*:/usr/local/hadoop/share/hadoop/yarn/*:/usr/local/hadoop/share/hadoop/mapreduce/lib/*:/usr/local/hadoop/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar</value>
    </property>
    </configuration>
  • 注意:

    • MapReduce是不一定依赖yarn的,但一般使用yarn框架来实现MapReduce
    • 此项若是不配,一些job只会在本机跑,而不会分发给其他机器
  1. 修改hive-site.xml
vim $HIVE_HOME/conf/hive-site.xml
  • 作用:hive的基本配置

  • 需要修改的机器:master、master1

  • 内容:

    • 修改hive.server2.webui.host

      <property>
      <name>hive.server2.webui.host</name>
      <value>${hostname}</value>
      <description>The host address the HiveServer2 WebUI will listen on</description>
      </property>

      其中${hostname}需要改成对应的主机名称(master与master1),或者都改为0.0.0.0

    • 修改hive.server2.bind.host

      <property>
      <name>hive.server2.thrift.bind.host</name>
      <value>${hostname}</value>
      <description>Bind host on which to run the HiveServer2 Thrift service.</description>
      </property>

      其中${hostname}需要改成对应的主机名称(master与master1),或者都改为0.0.0.0

    • 修改hive.server2.zookeeper.namespace

      <property>
      <name>hive.server2.zookeeper.namespace</name>
      <value>hiveserver2</value>
      <description>The parent node in ZooKeeper used by HiveServer2 when supporting dynamic service discovery.</description>
      </property>

      注意两个hiveserver节点的该值应设置为一样,指定了改值后,每当一个hiveserver节点启动时,在Zookeeper集群中,目录树下的hiveserver2文件夹下就可以看到该节点注册到Zookeeper中。

    • 修改javax.jdo.option.ConnectionURLjavax.jdo.option.ConnectionPasswordjavax.jdo.option.ConnectionDriverName

      <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://master:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false&amp;allowPublicKeyRetrieval=true</value>
      <description>
      JDBC connect string for a JDBC metastore.
      To use SSL to encrypt/authenticate the connection, provide database-specific SSL flag in the connection URL.
      For example, jdbc:postgresql://myhost/db?ssl=true for postgres database.
      </description>
      </property>
      <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>root</value>
      <description>password to use against metastore database</description>
      </property>
      <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.jdbc.Driver</value>
      <description>Driver class name for a JDBC metastore</description>
      </property>

      这里两个节点的数据库连接字符串也应该是一样的,需要知道的是这里只有master节点安装并运行了mysql服务,存储hive元数据的均在此mysql数据库中,意即存储元数据信息是与master1无关的,实际上mysql服务器可以在网络上的任意位置(此处我一开始也误解了,以为master和master1节点都需要存储hive的元数据)。

      另外,mysql的连接jar包需要下载并复制到hive的lib目录下。

至此整个Hadoop集群已经搭建完毕,却未完,需要使用Zookeeper集群来实现集群的高可用性(HA)。

4.3.3 Zookeeper配置

对于剩下的node0、node1、node2三台机器是用于搭建Zookeeper集群的,因此需要安装并配置Zookeeper-3.4.10:

vim /usr/local/zookeeper/conf/zoo.cfg
# 添加一下内容
server.1=192.168.137.200:2888:3888
server.2=192.168.137.201:2888:3888
server.3=192.168.137.202:2888:3888
# 保存并关闭
4.3.4 ssh免密登录配置
  • 作用:保证任何一台机器都可通过ssh免密访问其他机器,这对于使用sshfence策略的failover机制是很必要的

  • 需要修改的机器:所有机器

  • 内容:

    # 进入用户目录下的`.ssh`目录
    cd
    cd .ssh/
    # 创建公钥私钥对
    ssh-keygen -t rsa
    # 将公钥发送给其他所有节点,hostname需对应每一台机器更改为其主机名执行一遍
    ssh-copy-id ${hostname}

至此,Hadoop集群与Zookeeper集群已搭建完毕,接下来需要启动之。

4.4 集群启动

4.4.1 初始化数据库

在master机器上执行以下操作以初始化数据库:

cd $HIVE_HOME
schematool -initSchema -dbType mysql

此命令执行完之后将会在mysql中创建hive的元数据表,以存储hive的表结构及其他属性。

4.4.2 启动集群

在master机器上执行以下操作以启动集群:

start-all.sh

二 HDFS文件系统

1 简介

1.1 HDFS简介

HDFS,是Hadoop Distributed File System的简称,是Hadoop抽象文件系统的一种实现。Hadoop抽象文件系统可以与本地系统、Amazon S3等集成,甚至可以通过Web协议(webhsfs)来操作。HDFS的文件分布在集群机器上,同时提供副本进行容错及可靠性保证。例如客户端写入读取文件的直接操作都是分布在集群各个机器上的,没有单点性能压力。

1.2 NameNode与DataNode

HDFS由四部分组成,HDFS Client、NameNode、DataNode和Secondary NameNode。整个HDFS集群由Namenode和Datanode构成master-worker(主从)模式。Namenode负责构建命名空间,管理文件的元数据等,而Datanode负责实际存储数据,负责读写工作。

image-20210105151742421

Namenode存放文件系统树及所有文件、目录的元数据。元数据持久化为2种形式:Namespcae image和Edit log。但是持久化数据中不包括Block所在的节点列表,及文件的Block分布在集群中的哪些节点上,这些信息是在系统重启的时候重新构建(通过Datanode汇报的Block信息)。NameNode 的文件结构包含 edits、fsimage、seen_txid、VERSION 以及 in_use.lock。其中, edits 为编辑日志(edit log), 当客户端执行写操作时, NameNode 会先在编辑日志中写入记录,并在内存中保存一个文件系统的元数据。这一描述 符会在编辑日志改动后更新。fsimage 为文件系统镜像,是文件系统元数据的持久检查点,包含以序列化 格式存储的文件系统目录和文件 inodes,每个 inodes 表征一个文件或目录的元数 据信息以及文件的副本数、修改和访问时间等信息。seen_txid 文件代表的是 NameNode 中 edits_*文件的尾数,当 NameNode 重 启后,会按照 seen_txid 所定义的数字,循序从头运行 edits_0000001~至 seen_txid。VERSION 文件是 java 的属性文件,保存了 HDFS 的版本号。in_use.lock 的作用是防止一台机器同时启动多个 NameNode 进程,导致目录 数据不一致。

在HDFS中,Namenode可能成为集群的单点故障,Namenode不可用时,整个文件系统是不可用的。HDFS针对单点故障提供了2种解决机制:
1)备份持久化元数据
将文件系统的元数据同时写到多个文件系统, 例如同时将元数据写到本地文件系统及NFS。这些备份操作都是同步的、原子的。
2)Secondary Namenode
Secondary节点定期合并主Namenode的namespace image和edit log, 避免edit log过大,通过创建检查点checkpoint来合并。它会维护一个合并后的namespace image副本, 可用于在Namenode完全崩溃时恢复数据。Secondary Namenode通常运行在另一台机器,因为合并操作需要耗费大量的CPU和内存。其数据落后于Namenode,因此当Namenode完全崩溃时,会出现数据丢失。 通常做法是拷贝NFS中的备份元数据到Second,将其作为新的主Namenode。 在HA(High Availability高可用性)中可以运行一个Hot Standby,作为热备份,在Active Namenode故障之后,替代原有Namenode成为Active Namenode。SecondaryNameNode 的文件结构主要包括 edits、fsimage、VERSION 以及 in_use.lock。其中edits、fsimage、VERSION的内容均与NameNode相同,in_use.lock 的作用是防止一台机器同时启动多个 SecondaryNameNode 进程导致目录数据不 一致。

而DataNode则会负责存储和提取Block,读写请求可能来自namenode,也可能直接来自客户端。数据节点周期性向Namenode汇报自己节点上所存储的Block相关信息。DataNode通常直接从磁盘读取数据,但是频繁使用的Block可以在内存中缓存。默认情况下,一个Block只有一个数据节点会缓存。但是可以针对每个文件可以个性化配置。 作业调度器可以利用缓存提升性能,例如MapReduce可以把任务运行在有Block缓存的节点上。 用户或者应用可以向NameNode发送缓存指令(缓存哪个文件,缓存多久), 缓存池的概念用于管理一组缓存的权限和资源。DataNode 的文件结构主要包含 BP-前缀文件、blk前缀文件、VERSION 和 in_use.lock。 其中 BP-random integer-NameNode IP address-creation time 的 BP 代表 BlockPool,即 NameNode 的 VERSION 集群中唯一的 blockpoolID;finalized/rbw 目录用于存储 HDFS BLOCK 的数据, blk_前缀文件是 HDFS 中的文件块本身, 存储的是原始文件内容;VERSION 及 in_use.lock 的含义与上述类似。

我们知道NameNode的内存会制约文件数量,HDFS Federation提供了一种横向扩展NameNode的方式。在Federation模式中,每个NameNode管理命名空间的一部分,例如一个NameNode管理/user目录下的文件, 另一个NameNode管理/share目录下的文件。 每个NameNode管理一个namespace volumn,所有volumn构成文件系统的元数据。每个NameNode同时维护一个Block Pool,保存Block的节点映射等信息。各NameNode之间是独立的,一个节点的失败不会导致其他节点管理的文件不可用。 客户端使用mount table将文件路径映射到NameNode。mount table是在Namenode群组之上封装了一层,这一层也是一个Hadoop文件系统的实现,通过viewfs:协议访问。

2 分块机制

2.1 分块机制简介

物理磁盘中有块的概念,磁盘的物理Block是磁盘操作最小的单元,读写操作均以Block为最小单元,一般为512 Byte。文件系统在物理Block之上抽象了另一层概念,文件系统Block物理磁盘Block的整数倍。通常为几KB。Hadoop提供的df、fsck这类运维工具都是在文件系统的Block级别上进行操作。HDFS的Block块比一般单机文件系统大得多,默认为128M。HDFS的文件被拆分成block-sized的chunk,chunk作为独立单元存储。比Block小的文件不会占用整个Block,只会占据实际大小。例如, 如果一个文件大小为1M,则在HDFS中只会占用1M的空间,而不是128M。是为了最小化查找(seek)时间,控制定位文件与传输文件所用的时间比例。假设定位到Block所需的时间为10ms,磁盘传输速度为100M/s。如果要将定位到Block所用时间占传输时间的比例控制1%,则Block大小需要约100M。 但是如果Block设置过大,在MapReduce任务中,Map或者Reduce任务的个数 如果小于集群机器数量,会使得作业运行效率很低。Block的拆分使得单个文件大小可以大于整个磁盘的容量,构成文件的Block可以分布在整个集群, 理论上,单个文件可以占据集群中所有机器的磁盘。 Block的抽象也简化了存储系统,对于Block,无需关注其权限,所有者等内容(这些内容都在文件级别上进行控制)。Block作为容错和高可用机制中的副本单元,即以Block为单位进行复制。

2.2 块分配流程

通常当一个客户端a机器发起请求分配块请求,NN接收到请求后,执行如下块分配流程:
1) 如果a不是一个DataNode,则在集群范围内随机选择一个节点作为目标节点,否则执行下面的2,3步骤;
2) 判断a机器是否符合存储数据块的目标节点,如果符合,第一个块副本分配完毕;
3)如果a机器不符合作为目标节点,则在于与a机器同机架范围内寻找,如果找到目标节点,第一个块副本分配完毕;
4)如果在同一个机架内未找到符合要求的目标节点,则在集群内随机查找,找到则第一个块副本分配完毕,否则未找到符合条件的块,块分配失败;
5)如果已经成功分配第一个块副本,则与a不同机架的远程机架内寻找目标节点,如果符合,第二个块副本分配完毕;
6)如果在远程机架内未找到符合要求的目标节点,在与a相同的本机架寻找,如果找到则第二个块副本分配完毕;否则未找到符合条件的块,第二份块分配失败;
7)如果前2个块副本分配成功,则准备分配第三个副本的目标节点,首先会判断前两份是否在同一个机架,如果是,则在远程机架寻找目标节点,找到则第三份副本分配完毕;如果前两份在不同机架,则在与a相同机架内寻找,如果找到则第三份副本分配完毕,否则在集群范围寻找,找到则第三份分配完毕,否则第三份分配失败
8)如果块副本大于三分,则在集群范围内随机寻找节点

当在一个范围内找到一个节点后,还需要经过如上的条件判断,才能确定一个DataNode进程是否可以作为目标节点:
1) 如果没有节点机器被选择,则该节点可以作为备选节点,否则需要判断下一个DataNode是否符合要求;(这样就防止同一个块副本存储到同一台机器)
2) 然后判断节点是否退役,存储空间是否足够,负载是否大于2倍平均负载,本机架选择的节点是否超过限制,如果均满足,则该datanode符合要求,否则需要判断下一个DataNode是否符合要求

3 备份策略

3.1 备份策略简介

对于HDFS而言,由Namenode负责这个集群的数据备份和分配,在分配过程中,主要考虑下面两个因素:

  • 数据安全:在某个节点发生故障时,不会丢失数据备份;
  • 网络传输开销:在备份数据同步过程中,尽量减少网络传输中的带宽开销;

这两个因素看起来是有些相互矛盾的:想要保证数据安全,那么就尽量把数据备份到多台节点上,但是就需要向多个节点传输数据;想要减少网络传输开销,那么就尽可能把数据备份到一个节点内部或者一个机架内部,因为系统内部的数据传输速度会远大于网络传输的速度。

3.2 数据块部署

对于巨大的集群来说,把所有的节点都部署在一个平行的拓扑结构里是不太现实的。比较实际且通用的做法是,把所有的节点分布到多个Rack(服务器机架)上。每个Rack上的节点共享一个交换机,Rack之间可以使用一个或者多个核心交换机进行互联。在大多数情况下,同一Rack中的节点间通信的带宽肯定会高于不同Rack间节点的通信带宽。HDFS默认两个节点之间的网络带宽与他们的物理距离成正比。从一个节点到其父节点的距离被认为是常数1。这样,两个节点之间的距离可以通过将其到各级祖先节点的距离相加计算出来。两个节点之间的距离越短,就意味着他们之间传输数据的带宽越大。
HDFS允许管理员通过配置脚本,返回一个节点的rack标识符,作为节点地址。NameNode位于整个结构的最中央,负责解析每一个DataNode的rack位置。当DataNode注册到NameNode时,NameNode会运行这些配置脚本,确定节点属于哪个rack。如果没有进行脚本配置,NameNode则会认为所有的节点都属于一个默认的Rack。

数据块备份的部署对于HDFS数据的可靠性和读写性能都有至关重要的影响。良好的数据块部署策略能够有效地改进数据的可靠性,可用性,甚至提高网络带宽的利用率。目前的HDFS系统提供了可配置的数据块部署策略接口,以此来让用户和研究人员能够对不同的部署策略进行测验,从而达到对系统应用进行优化的目的。

缺省的HDFS数据块部署策略企图在降低数据写入代价,最大化数据可靠性,可用性,以及整合读数据带宽等几个方面做出权衡。当一个新的数据块被创建,HDFS会把第一个数据开备份放到写入程序所在的位置。第二个和第三个数据块备份会被部署到不同rack的其他两个不同的节点。剩余的数据块备份则被放到随机的节点上,但是限制每个节点不会部署多于一个相同的数据块,每个rack不会部署都与两个相同的数据块(如果条件满足的话)。之所以要把第二个和第三个数据块备份放到不同的rack上,是为了考虑到一个集群上的文件所应当具有的分布性。如果头两个数据块备份放到相同的rack上,那么对于任何文件来说,其2/3的文件块会被存放在同一rack上。

在所有目标节点都被选择后,这些节点会被有组织地按照其亲近程度,以流水线的方式被传输到第一个备份上。数据会被以这个顺序推送到节点。在读取的时候,NameNode首先会检查客户端所对应的主机是否位于集群当中。如果是,那么数据块的位置会被返回到客户端,并以按照距离远近排序。然后数据块就会按照顺序从DataNode中进行读取。
这一策略会降低rack之间以及节点之间的写入时间,普遍提高写入效率。因为rack故障的几率远低于节点故障的几率,所以该策略不会影响到数据的有效性和可用性。在大多数使用3数据块备份的情况下,该策略能够降低网络带宽的消耗,因为一个数据块只需要部署到两个不同的rack上,而不是3个。

3.3 备份策略

NameNode会尽力保证们每个数据块都有所需的备份数量。当Block Report从DataNode提交上来的时候,NameNode会侦测到数据块的备份数量是少于所需还是超过所需。当超过时,NameNode会选择删除一个数据备份。NameNode倾向于不减少rack的数量,并在DataNode中选择一个剩余磁盘空间最小的节点进行备份移除。这样做的主要目的是平衡利用DataNode的存储空间,并其不降低到数据块的可用性。

当一个数据块处于低于其备份需求数的状态时,该数据块就会被放入到备份优先队列中。仅拥有一个数据备份的数据块处于最高优先级,其数据备份数高于其备份因子2/3的数据块则处于最低优先级。有一个后台进程专门负责定期对备份优先队列进行扫面,以确定将备份部署到何处。数据块备份遵循与数据块部署相似的策略。如果数据块当前只有一个备份,那么HDFS会把一个新的备份放到不同rack上。如果数据块当前有两个备份,并且连个备份都存在与相同的rack上,第三个备份就会被放到不同的rack上。否则,第三个备份就被放到同一rack的不同节点上。这么做的目的也是为了降低创建备份的代价。

NameNode也会确保不把所有的数据块备份都部署到同一个rack上。如果NameNode侦测到某数据块的所有备份都在一个rack上,那么它就会把这个数据块当做是mis-replicated(误备份),然后它就会用上面所提到的策略,在其他的rack上把这个数据块再备份一次。在NameNode收到异地rack备份成功后,该数据块就成为了“备份数量高于所需备份数”状态。此时NameNode会根据策略把本地的一个备份删除,因为策略规定不能减少rack的数量。

4 文件一致性

4.1 一致性简介

客户端来看,一致性主要指的是多并发访问时更新过的数据如何获取的问题。从服务端来看,则是更新如何复制分布到整个系统,以保证数据最终一致。一致性是因为有并发读写才有的问题,因此在理解一致性的问题时,一定要注意结合考虑并发读写的场景。

从客户端角度,多进程并发访问时,更新过的数据在不同进程如何获取的不同策略,决定了不同的一致性。对于关系型数据库,要求更新过的数据能被后续的访问都能看到,这是强一致性。如果能容忍后续的部分或者全部访问不到,则是弱一致性。如果经过一段时间后要求能访问到更新后的数据,则是最终一致性

服务端角度,如何尽快将更新后的数据分布到整个系统,降低达到最终一致性的时间窗口,是提高系统的可用度和用户体验非常重要的方面。对于分布式数据系统:

  • N — 数据复制的份数
  • W — 更新数据时需要保证写完成的节点数
  • R — 读取数据的时候需要读取的节点数

如果W+R>N,写的节点和读的节点重叠,则是强一致性。例如对于典型的一主一备同步复制的关系型数据库,N=2,W=2,R=1,则不管读的是主库还是备库的数据,都是一致的。

如果W+R<=N,则是弱一致性。例如对于一主一备异步复制的关系型数据库,N=2,W=1,R=1,则如果读的是备库,就可能无法读取主库已经更新过的数据,所以是弱一致性。

对于分布式系统,为了保证高可用性,一般设置N>=3。不同的N,W,R组合,是在可用性和一致性之间取一个平衡,以适应不同的应用场景。

  • 如果N=W,R=1,任何一个写节点失效,都会导致写失败,因此可用性会降低,但是由于数据分布的N个节点是同步写入的,因此可以保证强一致性。
  • 如果N=R,W=1,只需要一个节点写入成功即可,写性能和可用性都比较高。但是读取其他节点的进程可能不能获取更新后的数据,因此是弱一致性。这种情况下,如果W<(N+1)/2,并且写入的节点不重叠的话,则会存在写冲突

4.2 一致性模型

HDFS牺牲了一些POSIX的需求来补偿性能,所以有些操作可能会和传统的文件系统不同。当创建一个文件时,它在文件系统的命名空间中是可见的,代码如下:

Path p = new Path("p");
fs.create(p);
assertThat(fs.exists(p),is(true));

但是对这个文件的任何写操作不保证是可见的,即使在数据流已经刷新的情况下,文件的长度很长时间也会显示为0 :

Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p),getLen(),is(0L));

一旦一个数据块写人成功,那么大家提出的新请求就可以看到这个块,而对当前写入的块,大家是看不见的。HDFS提供了使所有缓存和DataNode之间的数据强制同步的方法,这个方法是FSDataOutputStream中的sync()函数。当sync()函数返回成功时,HDFS就可以保证此时写入的文件数据是一致的并且对于所有新的用户都是可见的。即使HDFS客户端之间发生冲突,也会导致数据丢失,代码如下:

Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p),getLen(),is(((long) "content" .length()));

这个操作类似于UNIX系统中的fsync系统调用,为一个文件描述符提交缓存数据,利用Java API写入本地数据,这样就可以保证看到刷新流并且同步之后的数据,代码如下:

FileOutputStream out = new FileOutStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // flush to operatig system
out.getFD().sync(); // sync to disk
assertThat(fs.getFileStatus(p),getLen(),is(((long) "content" .length()));

在HDFS中关闭一个文件也隐式地执行了sync()函数,代码如下:

Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p),getLen(),is(((long) "content" .length()));

文件系统的一致性与设计应用程序的方法有关。如果不调用sync(),那么需要做好因客户端或者系统发生故障而丢失部分数据做好准备。对大多数应用程序来说,这是不可接受的,所以需要在合适的地方调用sync(),比如在写入一定量的数据之后。尽管sync()被设计用来最大限度地减少HDFS的负担,但是它仍然有不可忽视的开销,所以需要在数据健壮性和吞吐最之间做好权衡,其中一个较好的参考平衡点就是:通过测试应用程序来选择不同sync()频率间的最佳平衡点。

5 文件读写

5.1 写策略

应用程序通过创建新文件以及向新文件写数据的方式,给HDFS系统添加数据。文件关闭以后,被写入的数据就无法再修改或者删除,只有以“追加”方式重新打开文件后,才能再次为文件添加数据。HDFS采用单线程写,多线程读的模式。

HDFS客户端需要首先获得对文件操作的授权,然后才能对文件进行写操作。在此期间,其他的客户端都不能对该文件进行写操作。被授权的客户端通过向NameNode发送心跳信号来定期更新授权的状态。当文件关闭时,授权会被回收。文件授权期限分为软限制期和硬限制期两个等级。当处于软限制期内时,写文件的客户端独占对文件的访问权。当软限制过期后,如果客户端无法关闭文件,或没有释放对文件的授权,其他客户端即可以预定获取授权。当硬限制期过期后(一小时左右),如果此时客户端还没有更新(释放)授权,HDFS会认为原客户端已经退出,并自动终止文件的写行为,收回文件控制授权。文件的写控制授权并不会阻止其他客户端对文件进行读操作。因此一个文件可以有多个并行的客户端对其进行读取。

HDFS文件由多个文件块组成。当需要创建一个新文件块时,NameNode会生成唯一的块ID,分配块空间,以及决定将块和块的备份副本存储到哪些DataNode节点上。DataNode节点会形成一个管道,管道中DataNode节点的顺序能够确保从客户端到上一DataNode节点的总体网络距离最小。文件的则以有序包(sequence of packets)的形式被推送到管道。应用程序客户端创建第一个缓冲区,并向其中写入字节。第一个缓冲区被填满后(一般是64 KB大小),数据会被推送到管道。后续的包随时可以推送,并不需要等前一个包发送成功并发回通知(这被称为“未答复发送”——译者注)。不过,这种未答复发送包的数目会根据客户端所限定的“未答复包窗口”(outstanding packets windows)的大小进行限制。

在数据写入HDFS文件后,只要文件写操作没有关闭,HDFS就不保证数据在此期间对新增的客户端读操作可见。如果客户端用户程序需要确保对写入数据的可见性,可以显示地执行hflush操作。这样,当前的包就会被立即推送到管道,并且hflush操作会一直等到所有管道中的DataNode返回成功接收到数据的通知后才会停止。如此就可以保证所有在执行hflush之前所写入的数据对试图读取的客户端用户均可见。

在一个集群的数千个节点里,节点失效(往往是因为存储故障造成的)每天都有可能发生。DataNode中所包含的文件块备份可能会因为内存、磁盘或者网络的错误而造成损坏。为了避免这种错误的形成,HDFS会为其文件的每个数据块生成并存储一份Checksum(总和检查)。Checksum主要供HDFS客户端在读取文件时检查客户端,DataNode以及网络等几个方面可能造成的数据块损坏。当客户端开始建立HDFS文件时,会检查文件的每个数据块的checksum序列,并将其与数据一起发送给DataNode。 DataNode则将checksum存放在文件的元数据文件里,与数据块的具体数据分开存放。当HDFS读取文件时,文件的每个块数据和checksum均被发送到客户端。客户端会即时计算出接受的块数据的checksum, 并将其与接受到的checksum进行匹配。如果不匹配,客户端会通知NameNode,表明接受到的数据块已经损坏,并尝试从其他的DataNode节点获取所需的数据块。

当客户端打开一个文件进行读取时,会从NameNode中获得一个文件数据块列表,列表中包含了每一个所需的数据块的具体位置。这些位置会按照与客户端的距离进行排序。当具体进行数据块读取时,客户端总是尝试首先从最近的位置获取数据。如果尝试失败,客户端会根据排序的顺寻,从下一个位置获取数据。下列情况可能会造成数据读取失败:DataNode不可用,节点不再包含所需数据块,或者数据块备份损坏,以及checksum验证失败。
HDFS允许客户端从正在进行写操作的文件中读取数据。当进行这样的操作时,目前正在被写入的数据块对于NameNode来说是未知的。在这样的情况下,客户端会从所有数据块备份中挑选一个数据块,以这个数据块的最后长度作为开始读取数据之前的数据长度。HDFS I/O的设计是专门针对批处理系统进行优化的,比如MapReduce系统,这类系统对顺序读写的吞吐量都有很高的要求。针对于那些需要实时数据流以及随机读写级别的应用来说,系统的读/写响应时间还有待于优化,目前正在做这方面的努力。

5.2 读策略

读相对于写,简单一些,详细步骤如下:

  1. client访问NameNode,查询元数据信息,获得这个文件的数据块位置列表,返回输入流对象。
  2. 就近挑选一台datanode服务器,请求建立输入流 。
  3. DataNode向输入流中中写数据,以packet为单位来校验。
  4. 关闭输入流

三 MapReduce计算

1 简介

MapReduce是Google提出的一个软件架构,也是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念”Map(映射)”和”Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

2 工作原理

2.1 Map

程序根据输入格式将输入文件划分为多个部分,每个部分均用作映射。任务输入,每个map任务都有一个内存缓冲区,输入数据在映射阶段进行处理,中间结果被写入存储缓冲区,并由写入数据的一方确定。当数据达到内存缓冲区阈值(默认值为0.8)时,会启动线程将内存中溢出的数据写入磁盘,并继续缓冲而不影响映射的中间结果。在溢出写入过程中,MapReduce框架对键进行排序。如果中间结果相对较大,则会形成多个溢出文件。最后缓冲区中的数据也被写入磁盘,从而形成一个溢出文件(至少一个溢出文件)。如果有多个溢出文件,则所有文件最后都会合并为一个文件。

2.2 Reduce

当所有的映射任务完成后,每个映射任务会形成一个最终文件,并且该文件按区划分。reduce 任务启动之前,一个映射任务完成后,就会启动线程来拉取映射结果数据到相应的规约任务,不断地合并数据,为规约的数据输入做准备,当所有的射任务完成完成后,数据也拉取合并完毕后,规约任务启动,最终将输出结果存入HDFS 上。

3 MapReduce运行流程

3.1 分析 MapReduce 执行过程

MapReduce运行的时候,输入输出都是HDFS中的文件,首先,Mapper中运行的任务会去读取HDFS中的数据文件,通过调用map中的方法来处理数据,然后将处理结果输出给Reducer任务,Reducer将接收到的结果作为自己的输入数据,并且也调用自己的方法,将最后的结果输出到HDFS中。过程如下图所示:

img

3.2 Mapper 任务执行过程详解

每个Mapper 任务是一个java 进程,它会读取HDFS 中的文件,解析成很多的键值对,经过我们覆盖的map 方法处理后,转换为很多的键值对再输出。整个Mapper 任务的处理过程又可以分为以下几个阶段:

  1. 第一阶段是把输入文件按照一定的标准分片(InputSplit),每个输入片的大小是固定的。默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的。如果数据块(Block)的大小是默认值64MB,输入文件有两个,一个是32MB,一个是72MB。那么小的文件是一个输入片,大文件会分为两个数据块,那么是两个输入片。一共产生三个输入片。每一个输入片由一个Mapper 进程处理。这里的三个输入片,会有三个Mapper 进程处理。
  2. 第二阶段是对输入片中的记录按照一定的规则解析成键值对。有个默认规则是把每一行文本内容解析成键值对。“键”是每一行的起始位置(单位是字节),“值” 是本行的文本内容。
  3. 第三阶段是调用Mapper 类中的map 方法。第二阶段中解析出来的每一个键值对,调用一次map 方法。如果有1000 个键值对,就会调用1000 次map方法。每一次调用map 方法会输出零个或者多个键值对。
  4. 第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。比较是基于键进行的。比如我们的键表示省份(如北京、上海、山东等),那么就可以按照不同省份进行分区,同一个省份的键值对划分到一个区中。默认是只有一个区。分区的数量就是Reducer 任务运行的数量。默认只有一个Reducer 任务。
  5. 第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有
  6. 第六阶段,那么进入第六阶段;如果没有,直接输出到本地的linux 文件中。第六阶段是对数据进行归约处理,也就是reduce 处理。键相等的键值对会调用一次reduce 方法。经过这一阶段,数据量会减少。归约后的数据输出到本地的linux文件中。本阶段默认是没有的,需要用户自己增加这一阶段的代码。

上述过程如下图所示:

img

3.3 Reduce任务执行过程详解

每个Reducer 任务是一个java 进程。Reducer 任务接收Mapper 任务的输出,归约处理后写入到HDFS 中,可以分为如下几个阶段。

  1. 第一阶段是Reducer 任务会主动从Mapper 任务复制其输出的键值对。Mapper 任务可能会有很多,因此Reducer 会复制多个Mapper 的输出。
  2. 第二阶段是把复制到Reducer 本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
  3. 第三阶段是对排序后的键值对调用reduce 方法。键相等的键值对调用一次reduce 方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS 文件中。

在整个MapReduce 程序的开发过程中,我们最大的工作量是覆盖map 函数和覆盖reduce 函数。上述过程如下图所示:

img

3.4 键值对的编号

在对Mapper 任务、Reducer 任务的分析过程中,会看到很多阶段都出现了键值对,这里对键值对进行编号,方便理解键值对的变化情况,对于Mapper 任务输入的键值对,定义为key1 和value1。在map 方法中处理后,输出的键值对,定义为key2 和value2。reduce 方法接收key2 和value2,处理后,输出key3 和value3。具体如下图所示:

img

4 MapReduce实现

MapReduce 在执行的时候,先执行Map 函数,再执行Reduce 函数,达到分布式并行运算效果。其中Map 函数与Reduce 函数需要用户自行设计,而这两个任务也定义了任务本身。

4.1 统计平均通话次数

需要用到的数据为:主叫号码。仅需统计每个主叫号码在数据文件中出现的次数,用该数除以天数,即可得到平均值。与WordCount 类似,提取出第二列数据即可进行。具体代码如下图所示:

img

结果如下图所示,其中每一行有两个数据,第一列为主叫号码,第二列为平均每日通话次数:

img

4.2 统计不同通话类型下各运营商占比

需要用到的数据为:通话类型,主叫号码运营商,被叫号码运营商。与第一个需求不同,本需求的key 值由两个数共同组成,需要定义一个新的类用来包含通话类型和运行商。定义如下:

img

其中需要注意,由于key 中数据的个数不唯一,需要对内部的数据优先级进行规定,所以要重写compareTo 函数,对key 进行排序。主函数中需要注意Map 阶段的输出类型和Reduce 阶段的输入输出类型,代码如下:

img

结果如下图,其中1代表电信,2 代表移动,3 代表联通:

img

img

img

4.3 统计用户在各个时间段通话时长所占比例

所需用到的数据有:主叫电话号码,通话开始时间,通话结束时间。最后输出的形式是一长串的数组,因此同样需要设计一个新的类用于封装。与上一需求不同的是,该类用于value 的输入和输出。代码如下:

img

为了完成统计任务,还需要自己求出每个时间段的时间,才能统计占比。这需要设计统计时间段内持续时间的算法,代码如下:

img

主函数与上一需求类似,区别在于Reduce 阶段的结果输出为浮点型数据,需要再同最开始一样重新定义一个浮点结果类,才能输出。

结果如下图所示,每一行有9 个数据,第一列为主叫号码,第二到第八列为对应时间段中通话时长占一天的比例:

img