# BigDataProject **Repository Path**: yangyangyang123456/BigDataProject ## Basic Information - **Project Name**: BigDataProject - **Description**: No description available - **Primary Language**: Scala - **License**: Not specified - **Default Branch**: yangyang - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-08-05 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # BigDataProject #### 介绍 大数据处理:从数据采集-清洗-计算(离线+实时)-存储-查询等一系列的流程 #### 软件架构 软件架构说明 apache-hive-1.2.2-bin hadoop-2.8.0 jdk1.8.0_201 kafka_2.11-2.0.1 scala-2.11.12 spark-2.1.1-bin-hadoop2.7 zookeeper-3.4.9 apache-flume-1.8.0-bin.tar.gz mysql-5.7.26-linux-glibc2.12-x86_64.tar.gz mysql-connector-java-5.1.47.jar sqoop-1.4.7.bin__hadoop-2.6.0 #### 安装教程 1. 所有的安装包 在 /home/software/ 2. 所有的安装软件 在 /usr/local/ 3. mysql的密码 记录在 /root/mysql-pwd.txt 4. 测试的jar包 在 /home/test 5. 所有的数据 在 /home/data ## (一)模块介绍 互联网金融领域大数据,需要一套完备的数据采集系统,从数据采集到最终的数据展现,用以支持用户决策。 ## (二)需求分析 1. 数据采集 * 一部分数据来自业务系统,使用sqoop将业务数据抽取到大数据平台 * 一部分数据来自服务器日志,使用flume日志采集,实时数据进入kafka,离线数据进入hdfs,供下游计算分析使用 2. 数据处理 * * 3. 数据存储 * * 4. 数据展示 ## (三)技术方案 1. 数据采集逻辑(ETL) * 日志一般存储在日志服务器,通过 Flume 拉取到 HDFS 上。 * 通过 sqoop 从关系型数据库中读取数据。 2. 数据清洗逻辑 * 使用 Spark Core 进行数据清洗。 3. 各个业务模块的分析计算 * 使用 Spark SQL 进行数据的分析和处理。 ## (四)实验数据及说明 * loan_detail: hive: CREATE TABLE loan_detail( dt string, customer_id string, product_code string, loan_id string, loan_status string, kpi string, issue_amount bigint, principal_payable bigint, overdue_principal_payable bigint, principal_repay bigint, unionloan_name string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' LINES TERMINATED BY '\n' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' ; * bank_detail mysql: CREATE TABLE `bank_detail` ( `uid` int(10) NOT NULL, `tradetime` varchar(10) DEFAULT NULL, `tradetype` varchar(10) DEFAULT NULL, `tradeacount` double DEFAULT NULL, `salary` varchar(1) DEFAULT NULL, `badid` int(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`badid`) ) ENGINE=InnoDB AUTO_INCREMENT=6070198 DEFAULT CHARSET=latin1 hive: create table data.bank_detail( uid bigint, tradetime string, tradetype string, tradeacount string, salary string, badid bigint ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ; * bank mysql: CREATE TABLE `bank` ( `bankid` int(11) NOT NULL, `bankname` varchar(20) CHARACTER SET utf8 NOT NULL, PRIMARY KEY (`bankid`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1 hive: create table data.bank( bankid bigint, bankname string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ; * bill_detail mysql: CREATE TABLE `bill_detail` ( `uid` int(10) NOT NULL, `billtime` varchar(10) DEFAULT NULL, `bankid` varchar(10) DEFAULT NULL, `previousbills` float DEFAULT NULL, `lastrepayment` float DEFAULT NULL, `creditlimit` float DEFAULT NULL, `currentBillBalance` float DEFAULT NULL, `minimumRepaymentOfCurrentBills` float DEFAULT NULL, `consumptionPenNumber` int(10) DEFAULT NULL, `currentBillAmount` float DEFAULT NULL, `adjustmentAmount` float DEFAULT NULL, `revolvingInterest` float DEFAULT NULL, `availableBalance` float DEFAULT NULL, `cashAdvanceLimit` float DEFAULT NULL, `repaymentStatus` varchar(10) DEFAULT NULL, `bidid` int(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`bidid`) ) ENGINE=InnoDB AUTO_INCREMENT=2338119 DEFAULT CHARSET=latin1 hive: create table data.bill_detail( uid bigint, billtime string, bankid string, previousbills double, lastrepayment double, creditlimit double, currentBillBalance double, minimumRepaymentOfCurrentBills double, consumptionPenNumber bigint, currentBillAmount double, adjustmentAmount double, revolvingInterest double, availableBalance double, cashAdvanceLimit double, repaymentStatus string, bidid bigint, ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ; * browse_history mysql: CREATE TABLE `browse_history` ( `uid` int(10) NOT NULL, `browsetime` varchar(10) DEFAULT NULL, `browsedata` varchar(10) DEFAULT NULL, `behaviorno` varchar(10) DEFAULT NULL, `bhid` int(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`bhid`) ) ENGINE=InnoDB AUTO_INCREMENT=22919548 DEFAULT CHARSET=latin1 hive: create table data.browse_history( uid bigint, browsetime string, browsedata string, behaviorno string, bhid bigint ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ; * loan_time mysql: CREATE TABLE `loan_time` ( `uid` int(10) DEFAULT NULL, `lendingTime` varchar(10) DEFAULT NULL, `ltid` int(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`ltid`) ) ENGINE=InnoDB AUTO_INCREMENT=55597 DEFAULT CHARSET=latin1 hive: create table data.loan_time( uid bigint, lendingTime string, ltid bigint ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ; * overdue mysql: CREATE TABLE `overdue` ( `uid` int(10) NOT NULL, `sampleLabel` varchar(2) DEFAULT NULL, `odid` int(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`odid`) ) ENGINE=InnoDB AUTO_INCREMENT=55597 DEFAULT CHARSET=latin1 hive: create table data.overdue( uid bigint, sampleLabel string, odid bigint ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ; * user_info mysql: CREATE TABLE `user_info` ( `uid` int(10) NOT NULL, `sex` varchar(2) NOT NULL, `career` varchar(3) NOT NULL, `education` varchar(3) NOT NULL, `marriage` varchar(2) NOT NULL, `usertype` varchar(3) NOT NULL, `birthday` varchar(10) NOT NULL, `province` varchar(3) NOT NULL, PRIMARY KEY (`uid`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1 hive: create table data.user_info( uid bigint, sex string, career string, education string, marriage string, usertype string, birthday string, province string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ; * province mysql: CREATE TABLE `province` ( `pid` int(11) NOT NULL, `name` varchar(20) CHARACTER SET utf8 NOT NULL, PRIMARY KEY (`pid`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1 hive: create table data.province( pid bigint, name string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ; ## (五)技术实现 1. 使用Flume采集日志 * 启动 Flume agent,在 Flume 的根目录下执行命令: * hdfs: bin/flume-ng agent --conf conf --conf-file conf/netcat-hdfs.conf --name a1 -Dflume.root.logger=INFO,console * kafka: bin/flume-ng agent --conf conf --conf-file conf/netcat-kafka.conf --name a1 -Dflume.root.logger=INFO,console * Flume 会将 日志文件采集到 hdfs://140.143.231.218:9000/flume/events/%Y-%m-%d/%H%M%S/ 目录下 2. 数据的清洗 * 需要将用户点击日志里面对于商品的点击识别出来 * 过滤不满足6个字段的数据 * 过滤URL为空的数据,即:过滤出包含http开头的日志记录 * 实现方式:使用 Spark 程序进行数据的清洗 * [源文件及 pom 文件](https://github.com/MrQuJL/area-hot-product/tree/master/02_clean/spark) * 打成 jar 包,提交到 spark 上运行:bin/spark-submit --class clean.CleanData --master spark://qujianlei:7077 ~/jars/people-0.0.1-SNAPSHOT.jar hdfs://qujianlei:9000/flume/20190204/events-.1549261170696 hdfs://qujianlei:9000/testOutput/ 3. 业务数据统计:基于 Spark SQL * 方式:使用 Spark SQL 进行统计 [源码及 pom 文件](https://github.com/MrQuJL/area-hot-product/tree/master/03_analysis/spark_sql) 提交到spark集群上运行:bin/spark-submit --class hot.HotProductByArea --master spark://qujianlei:7077 ~/jars/people-0.0.1-SNAPSHOT.jar hdfs://qujianlei:9000/input/area/areainfo.txt hdfs://qujianlei:9000/input/product/productinfo.txt hdfs://qujianlei:9000/output/190204/part-r-00000 hdfs://qujianlei:9000/output/analysis