大数据开发hadoop
大数据开发项目报告
1. 背景简介
hadoop开源代码给我们提供了大数据处理技术,包括数据收集、数据存储、资源管理和服务协调、数据计算以及数据分析的涉及大数据的整个生命周期的技术。本次实验目的,是加深对Hadoop数据存储和计算的了解,进一步掌握大数据现在已经广泛应用的基本技术,为未来研究大数据处理创新技术打下基础。
2、项目简介
主要完成三个工作:
-
生成7天的模拟网站访问数据
记录的格式,有哪些网址,如何分类商品信息 -
进行简单的数据清洗,把收集到的数据转化为可分析的数据
-
采用MapReduce统计网站页面的访问量,统计算法=每天的网址数量之和/7;
采用topn算法,统计出热门区域在前三的省份和城市信息;
进行简单的数据清洗,选出浏览网站商品详情页或搜索商品信息页面的网址,统计商品信息。
1)搭建集群环境
注册了两台阿里云服务器,其担任的角色分别如下:
NN | DN | |
---|---|---|
HDFS | NameNode、DataNode | DataNode、SecondaryNameNode |
YARN | ResourceManager、NodeManager | NodeManager |
注意:步骤一 ~ 步骤四要分别在2个节点上完成
步骤五~步骤九就在NN节点上完成,之后分发给DN节点;分发前不要启动hdfs和yarn!!!!
步骤一:搭建Java环境
##把安装包上传到Linux,然后安装二进制的Java发布包
tar -zxvf jdk-xxxxxx.tar.gz -C /usr
cd /usr
mv jdk_xxxxx java
##配置环境变量
vim /etc/profile
export JAVA_HOME=/usr/java
export PATH=$PATH:$JAVA_HOME/bin
##让配置生效(只要是重新设置了该配置文件profile,就要重新使其生效)
source /etc/profile
##测试安装是否成功,在任意的目录下执行
java -version
步骤二:关闭防火墙
ufw disable
步骤三:设置ip和域名的映射以便集群机器之间可以通信
这里都是公网ip,在云服务器平台可以找到
vim /etc/hosts -----hosts本地存储的配置文件,相当于本地服务器
172.28.30.128 nn ------局域网内计算机通信使用
172.28.30.127 dn
步骤四:设置自己本机免密登录
ssh localhost <!---验证是否需要密码--->
A生成自己的公钥
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
ssh localhost <!---再次验证是否需要密码--->
exit 退出
步骤五:安装hadoop
tar -zxvf hadoop-xxxx.tar.gz -C /opt
cd /opt
mv hadoop_xxxx hadoop
###修改配置文件
vim /etc/profile
export JAVA_HOME=/usr/java
export HADOOP_HOME=/opt/hadoop
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
###使配置文件生效
source /etc/profile
###验证是否成功
hadoop
步骤六:设置集群环境
-----------在hadoop的安装目录/opt下
cp -r hadoop/ hadoop_bak
cd /opt/hadoop/etc/hadoop
ls
-----------设置hadoop运行环境
vim hadoop-env.sh
export JAVA_HOME=/usr/java
-----------配置hdfs监听的端口号
vim core-site.xml
<configuration>节点中新增如下配置
<property>
<name>fs.defaultFS</name>
<value>hdfs://nn:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop/tmp</value>
</property>
-----------配置hdfs的架构
vim hdfs-site.xml
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/var/bigdata/hadoop/distribute/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/var/bigdata/hadoop/distribute/dfs/data</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>dn:9868</value>
</property>
<property>
<name>dfs.namenode.checkpoint.dir</name>
<value>/var/bigdata/hadoop/distribute/dfs/secondary</value>
</property>
-----------设置DataNode结点
vim slaves
(注意要删掉localhost)
nn
dn
端口名称 | Hadoop2.x |
---|---|
NameNode内部通信端口 | 8020、9000 |
NameNode HTTP UI | 50070 |
MapReduce查看执行任务端口 | 8088 |
历史服务器通信端口 | 19888 |
core-site.xml:
我把临时文件设置在/var/bigdata/tmp下了,这样万一出错,只要集中删除这里的tmp目录
hdfs-site.xml:
把NameNode和DataNode节点的存储文件都设置在/var/bigdata/hadoop/下
步骤七:配置MapReduce环境
cd /opt/hadoop/etc/hadoop
cp mapred-site.xml.template mapred-site.xml
vim mapred-site.xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
-----------配置Yarn框架
vim yarn-site.xml
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>nn</value> <!---设置nn为ResourceManager-->
</property>
mapred-site.xml:
yarn-site.xml:
nn节点既是NameNode,又是ResourceManager
步骤八:设置免密登录
将NameNode上的公钥发送给其他所有节点
cd ~
ls -lah
cd .ssh/ <!---可看到所有钥匙,包括NN之前创建公钥id_rsa.pub--->
scp id_rsa.pub root@dn:~/.ssh/nn_rsa.pub
在所有DataNode上执行
cd ~/.ssh/
cat nn_rsa.pub >> authorized_keys
验证nn是否能免密登录所有dn
ssh dn
无需再用密码,就表示成功了
步骤九:分发hadoop软件到其他节点
如果选择分发jdk软件,而不是一个个装的话,这样做:
cd /usr
scp -r java root@dn:/usr
然后要配置java环境
vim /etc/profile
export JAVA_HOME=/usr/java
export PATH=$PATH:$JAVA_HOME/bin
source /etc/profile
分发hadoop
cd /opt
scp -r hadoop root@dn:/opt
步骤十:在其他机器节点中配置hadoop环境变量
vim /etc/profile
export JAVA_HOME=/usr/java
export HADOOP_HOME=/opt/hadoop
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
source /etc/profile <!--使配置生效-->
步骤十一:完成初始化并运行
在Name Node上
hdfs namenode -format ----------该操作只能执行一次
start-dfs.sh
在Resource Manager节点上
start-yarn.sh
在所有节点上测试
jps
http://nn:50070去查看HDFS可视化情况
http://rm:8088可以去查看作业的运行情况
NN的内存情况
DN的内存情况
2)测试官方word count案例
先把测试文件提交到集群
hdfs dfs -mkdir /word
hdfs dfs -put words.txt /word
然后执行如下命令
cd /opt/hadoop/share/hadoop/mapreduce/
hadoop jar hadoop-mapreduce-examples-2.6.5.jar wordcount /word /wordout
删除文件命令
hadoop fs -rm -r 文件名称
3)生成7天的模拟网站访问数据
控制每天数据量在100w到200w之间,随机生成;然后逐天产生每天的数据(一共生成7天的数据);
数据格式为:ip地址—访问时间—访问地址—访问设备,例如
具体步骤:
步骤一:生成Date、IP和Devices三种数据,分别保存在date.txt、ip.txt、device.txt当中(“数据库”)
如何生成?
先准备好空的 date.txt、ip.txt以及工具类,使用test下的DateTest、IpTest、AppendFileTest三个函数分别生成。其中的AppendFileTest用来把access.log当中的数据一条条的追加到dest.log中。就是模拟了获取数据然后添加到日志中
import java.text.SimpleDateFormat;
import java.util.Date;
public class DateUtil {
public static Date randomDate(String beginDate, String endDate) {
try {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date start = format.parse(beginDate);// 构造开始日期
Date end = format.parse(endDate);// 构造结束日期
// getTime()表示返回自 1970 年 1 月 1 日 00:00:00 GMT 以来此 Date 对象表示的毫秒数。
if (start.getTime() >= end.getTime()) {
return null;
}
long date = random(start.getTime(), end.getTime());
return new Date(date);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private static long random(long begin, long end) {
long rtn = begin + (long) (Math.random() * (end - begin));
if (rtn == begin || rtn == end) {
return random(begin, end);
}
return rtn;
}
}
import java.util.Random;
public class IPUtil {
private static int[][] range = {{607649792,608174079},
{1038614528,1039007743},
{1783627776,1784676351},
{2035023872,2035154943},
{2078801920,2079064063},
{-1950089216,-1948778497},
{-1425539072,-1425014785},
{-1236271104,-1235419137},
{-770113536,-768606209},
{-569376768,-564133889},
};
public static String getRandomIpAddress(){
Random rdint = new Random();
int index = rdint.nextInt(10);
String ip = num2ip(range[index][0]+new Random().nextInt(range[index][1]-range[index][0]));
return ip;
}
private static String num2ip(int ip) {
int [] b=new int[4] ;
String x = "";
b[0] = (int)((ip >> 24) & 0xff);
b[1] = (int)((ip >> 16) & 0xff);
b[2] = (int)((ip >> 8) & 0xff);
b[3] = (int)(ip & 0xff);
x=Integer.toString(b[0])+"."+Integer.toString(b[1])+"."+Integer.toString(b[2])+"."+Integer.toString(b[3]);
return x;
}
}
测试下Random类的使用
//测试Random类
import java.util.Random;
public class RandomTest {
public void testRandom() {
Random random = new Random();
int[] num = new int[7];
for (int i = 0; i < 7; i++) {
num[i] = random.nextInt(100);
}
for (int i = 0; i < 7; i++) {
System.out.println(num[i]);
}
}
}
生成Date数据
import generateData.DateUtil; import org.junit.Test; import java.io.*; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; public class DateTest { private static String[][] gap = { {"2023-01-01 00:00:00","2023-01-01 23:59:59"}, {"2023-01-02 00:00:00","2023-01-02 23:59:59"}, {"2023-01-03 00:00:00","2023-01-03 23:59:59"}, {"2023-01-04 00:00:00","2023-01-04 23:59:59"}, {"2023-01-05 00:00:00","2023-01-05 23:59:59"}, {"2023-01-06 00:00:00","2023-01-06 23:59:59"}, {"2023-01-07 00:00:00","2023-01-07 23:59:59"}, };//备用的日期 @Test public void testDate() throws IOException, FileNotFoundException { Random random = new Random(); List<Long> dates = new ArrayList<>(); for (int i = 0; i < 7; i++) { dates.clear();//清空原来的数据 int size = random.nextInt(500000) + 1000000;//当前date.txt数据量 // 生成size个数据 for (int j = 0; j < size; j++) { Date date = DateUtil.randomDate(gap[i][0],gap[i][1]); long time = date.getTime();//获取了自1970年1月1日起的毫秒数 dates.add(time);//添加到表里 } Long[] array = new Long[dates.size()]; Long[] longDate = dates.toArray(array); //对日期排序,从早到晚 //quickSort(longDate,0,longDate.length-1); quick3WaySort(longDate,0,longDate.length-1); //获取到待写入文件的路径(当前目录下date.txt的路径),再写文件,此时的date.txt为空 String path; switch (i){ case 0: path = DateTest.class.getClassLoader().getResource("date1.txt").getPath(); break; case 1: path = DateTest.class.getClassLoader().getResource("date2.txt").getPath(); break; case 2: path = DateTest.class.getClassLoader().getResource("date3.txt").getPath(); break; case 3: path = DateTest.class.getClassLoader().getResource("date4.txt").getPath(); break; case 4: path = DateTest.class.getClassLoader().getResource("date5.txt").getPath(); break; case 5: path = DateTest.class.getClassLoader().getResource("date6.txt").getPath(); break; case 6: path = DateTest.class.getClassLoader().getResource("date7.txt").getPath(); break; default: System.out.println("没指定.txt文件"); return; } //将这些写入文件,并采用追加写的方式 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(path), true))); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式 for (int j = 0; j < longDate.length; j++) { Date date = new Date(longDate[j]); String datetime = format.format(date); bufferedWriter.write(datetime); bufferedWriter.newLine(); } bufferedWriter.flush(); bufferedWriter.close(); } } public void quick3WaySort(Long[] arr, int left, int right) { if (left > right) return; int lt = left; int i = left + 1; int gt = right; long tem = arr[left]; while (i <= gt) { if (arr[i] < tem) //小于切分元素的放在lt左边,lt和i整体右移 { //exchange(a, lt++, i++); long tmp = arr[i]; arr[i] = arr[lt]; arr[lt] = tmp; lt++; i++; } else if (arr[i] > tem)//大于切分元素的放在gt右边,因此gt左移 { //exchange(a, i, gt--); long tmp = arr[i]; arr[i] = arr[gt]; arr[gt] = tmp; gt--; } else i++; } quick3WaySort(arr,left,lt-1); quick3WaySort(arr, gt+1, right); } public static void quickSort(Long[] arr, int start, int end){ if(start < end){ //把开始位置当做标准数 Long stard = arr[start]; //记住需要排序的下标 int low = start; int high= end; //循环找比标准数大的数和比标准数小的数 while(low < high){ //不用替换(右边的数字比标准数大) while(low < high && stard <= arr[high]){ high--; } //使用右边的数字替换左边的数(比较后比标准数小的替换) arr[low] = arr[high]; while(low < high && arr[low] <= stard){ low++; } arr[high] = arr[low]; } arr[low] = stard; quickSort(arr, 0, low); quickSort(arr, low+1, end); } } }
生成IP数据
import com.cskaoyan.IPUtil; import org.junit.Test; import java.io.*; public class IpTest { @Test public void testGenerateIp(){ String address = IPUtil.getRandomIpAddress(); System.out.println(address); } @Test public void testBatchWriteIpAddressToFile() throws IOException { String path = IpTest.class.getClassLoader().getResource("ip.txt").getPath(); BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(path), true))); //随机生成40w个的ip地址 for (int i = 0; i < 400000; i++) { String address = IPUtil.getRandomIpAddress(); bufferedWriter.write(address); bufferedWriter.newLine(); } bufferedWriter.flush(); bufferedWriter.close(); } }
将生成的access.txt数据追加到目标文件中
import org.junit.Test; import java.io.*; public class AppendFileTest { @Test public void testAppend() throws IOException, InterruptedException { String path = AppendFileTest.class.getClassLoader().getResource("access.log").getPath(); BufferedReader bufferedReader = new BufferedReader(new FileReader(new File(path))); String content = null; String destPath = AppendFileTest.class.getClassLoader().getResource("blog.log").getPath(); File destFile = new File(destPath); BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(destFile, true))); while ((content = bufferedReader.readLine()) != null){ bufferedWriter.write(content); bufferedWriter.newLine(); bufferedWriter.flush(); Thread.sleep(1000); } bufferedWriter.close(); } }
步骤二:调用GenerateData函数,其中先从数据仓库统一读入数据到各自的List当中:使用一个类ListUtil,可以通过赋予不同类型的List参数,随机地获取不同的值;但是只有日期这个,在生成的时候必须升序排列好,后续是有序的获取日期的值;
并且从网上搜好准备好的网站地址存放在文件url.txt中
//生成完整的用户浏览记录数据
import java.io.*;
import java.util.ArrayList;
import java.util.List;
public class GenerateData {
private static List<String> ips = new ArrayList<>();
private static List<String> dates = new ArrayList<>();
private static List<String> urls = new ArrayList<>();
private static List<String> devices = new ArrayList<>();
public static void main(String[] args) throws IOException {
//数据的格式:ip地址 访问时间 访问地址 访问设备
prepare();
int len = dates.size();//告诉jvm现在有多少条数据
StringBuffer buffer = new StringBuffer();
String path = GenerateData.class.getClassLoader().getResource("access0101.log").getPath();
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(path), true)));
for (int i = 0; i < len; i++) { //读取当前这个dates的长度,这样就能完整的按照日期数量读入所有有序的日期,从早到晚
buffer.append(ListUtil.getRandomElement(ips) + "---"); //从ip仓库中随机选择一个ip
buffer.append(dates.get(i) + "---"); //按顺序读取日期
buffer.append(ListUtil.getRandomElement(urls) + "---"); //从url仓库中随机选择一个url
buffer.append(ListUtil.getRandomElement(devices)); //从devices仓库中随机选择一个device
bufferedWriter.write(buffer.toString());
bufferedWriter.newLine(); // 换行
buffer.delete(0, buffer.length()); //又清空buffer的数据,以便接下来读入新数据
}
bufferedWriter.flush();
bufferedWriter.close();
}
private static void prepare() throws IOException {
loadIPs();
loadDates();
loadUrls();
loadDevices();
}
private static void loadDevices() throws IOException {
String path = GenerateData.class.getClassLoader().getResource("device.txt").getPath();
BufferedReader bufferedReader = new BufferedReader(new FileReader(new File(path)));
String device = null;
while ((device = bufferedReader.readLine()) != null){
devices.add(device);
}
}
private static void loadUrls() throws IOException {
String path = GenerateData.class.getClassLoader().getResource("url.txt").getPath();
BufferedReader bufferedReader = new BufferedReader(new FileReader(new File(path)));
String url = null;
while ((url = bufferedReader.readLine()) != null){
urls.add(url);
}
}
private static void loadDates() throws IOException {
String path = GenerateData.class.getClassLoader().getResource("date1.txt").getPath();
BufferedReader bufferedReader = new BufferedReader(new FileReader(new File(path)));
String date = null;
while ((date = bufferedReader.readLine()) != null){
dates.add(date);
}
}
private static void loadIPs() throws IOException {
String path = GenerateData.class.getClassLoader().getResource("ip.txt").getPath();
BufferedReader bufferedReader = new BufferedReader(new FileReader(new File(path)));
String ip = null;
while ((ip = bufferedReader.readLine()) != null){
ips.add(ip);
}
}
}
步骤三:在resource文件夹中准备好access.log文件,逐条生成数据:数据格式:ip地址—访问时间—访问地址—访问设备。每拼接好一条数据,就追加到access.log文件中。采用accessXXXX.log记录是哪一天的数据;
步骤四:将所有的access.log文件都追加到dest.log中,收集了7天的所有数据,然后把这个blog.log放到集群里
package generateData;
import java.io.*;
public class GenerateLogFile {
public static void main(String[] args) throws IOException, InterruptedException {
String inPath = "D:/kayla/大数据数据仓库/日志文件/access0107.log";
String outPath = "D:/kayla/大数据数据仓库/日志文件/blog.log";
BufferedReader bufferedReader = new BufferedReader(new FileReader(new File(inPath)));
String content = null;
File destFile = new File(outPath);
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(destFile, true)));
while ((content = bufferedReader.readLine()) != null){
bufferedWriter.write(content);
bufferedWriter.newLine();
bufferedWriter.flush();
//Thread.sleep(10);
}
bufferedWriter.close();
}
}
最后将blog.log文件提交到集群里。
4)数据清洗–ETL
package etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class ETL {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration(true);
Job job = Job.getInstance(configuration);
job.setJobName("etl");
job.setJarByClass(ETL.class);
//设置输入、输出路径
TextInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(ETLMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//注意:我们这里面不需要reduce处理,我们目前只需要将ip地址转换成地域即可
//job.setReducerClass();
job.waitForCompletion(true);
}
}
package etl;
import ip.QQWryExt;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ETLMapper extends Mapper<Object, Text, Text, NullWritable> {
//这个一定要写成成员变量,否则会执行几天几夜
QQWryExt ext;
{
try {
ext = new QQWryExt();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//121.76.232.4---2023-01-06 00:00:01---https://mall.kayla.com/item/2397.html---Safari
String line = value.toString();
// 0 (ip)、1、2、3
String[] parts = line.split("---");
String ip = parts[0];
String datetime = parts[1];
String url = parts[2];
String device = parts[3];
QQWryExt.RegionInfo regionInfo = ext.analyticIP(ip);
//可以将这些数据重新拼接在一起 ip---regionInfo---time---url---device
String region = regionInfo.getCountry() + ":" + regionInfo.getProvince() + ":" + regionInfo.getCity();
String newLine = ip + "---" + region + "---" + datetime + "---" + url + "---" + device;
context.write(new Text(newLine), NullWritable.get());
}
}
将上述文件打包,在集群中运行
hadoop jar bigData-1.0-SNAPSHOT.jar etl.ETL /data /output
hadoop fs -rm -r 要删除的文件名称
hdfs dfs -cat /output/part-r-00000 <!---查看结果---->
结果:是按照ip地址升序排列的,也即同一个用户的数据都归拢到一起
5)统计每日PV数
每日PV数:pv
主函数
package kpi.pv;
import kpi.Summary;
import outputFormat.MysqlOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;
public class DailyPV {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration(true);
// configuration.set("mapreduce.app-submission.cross-platform", "true");
Job job = Job.getInstance(configuration);
// job.setJar("D:\\ideaProjects\\36th\\mr\\target\\mr-1.0-SNAPSHOT.jar");
job.setJarByClass(DailyPV.class);
job.setJobName("dailyPV");
Path inputPath = new Path(args[0]);
//最终要输出到mysql,所以不需要配置输出路径
TextInputFormat.addInputPath(job, inputPath);
//要把这个mysql的jar包放入到Linux中
job.addArchiveToClassPath(new Path("/lib/mysql/mysql-connector-java-5.1.47.jar"));
job.setMapperClass(DailyPVMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);//每个日期作为一个关键词,值为1
job.setReducerClass(DailyPVReducer.class);//在 Reducer中将所有日期相同的value加起来
job.setOutputKeyClass(Summary.class);//因为要输出多个值(两个以上),所以要自己新建类
job.setOutputValueClass(Text.class);//结果为统计的PV数
//对于自定义的类,必须指定输出格式
job.setOutputFormatClass(MysqlOutputFormat.class);
job.waitForCompletion(true);
}
}
Daily PV的Mapper
package kpi.pv;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class DailyPVMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text mapperKey = new Text();
private IntWritable intWritable = new IntWritable(1);//指定每个键的值为1
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 106.80.0.169---中国:重庆市:未知---2023-01-04 11:14:31---https://mall.kayla.com/item/2382.html---Android
String line = value.toString();
if(line == null || "".equals(line.trim())){//没有数据或者文本为空时
return;
}
String[] parts = line.split("---"); //按照---分割字符串
//取出时间中的日期的部分 2022-01-14 08:01:53
String datetime = parts[2];
String[] s = datetime.split(" ");
//2023-01-04
mapperKey.set(s[0]);//以日期作为关键字
context.write(mapperKey, intWritable);//有多少条数据,就会有多少个键值对
}
}
DailyPV的Reducer
package kpi.pv;
import kpi.Summary;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class DailyPVReducer extends Reducer<Text, IntWritable, Summary, Text> {
@Override
//这个 Text key, Iterable<IntWritable> values 其实是shuffle后的键值对,相同关键字的value都放在一个链表中
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
//其实,reduce中我输出了三个数据 2021-01-14 dailyPV 1000000
Summary dailyPV = new Summary(null, key.toString(), "dailyPV", sum + "", null);
context.write(dailyPV, null);//一个日期的计算好了以后就写入集群里
}
}
6)分析每日流量地域分布
每日流量地域分布:area
area的主要函数
package kpi.area;
import kpi.Summary;
import outputFormat.MysqlOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;
public class DailyArea {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration(true);
Job job = Job.getInstance(conf);
job.setJobName("daily-area");
job.setJarByClass(DailyArea.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
job.addArchiveToClassPath(new Path("/lib/mysql/mysql-connector-java-5.1.47.jar"));
job.setMapperClass(DailyAreaMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(DailyAreaReducer.class);
job.setOutputFormatClass(MysqlOutputFormat.class);
job.setOutputKeyClass(Summary.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
}
}
area的Mapper函数
package kpi.area;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class DailyAreaMapper extends Mapper<Object, Text, Text, Text> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 106.80.0.169---中国:重庆市:未知---2023-01-04 11:14:31---https://mall.kayla.com/item/2382.html---Android
String line = value.toString();
String[] parts = line.split("---");
String area = parts[1];
String datetime = parts[2];
String province = area.split(":")[1];
String date = datetime.split(" ")[0];
context.write(new Text(province), new Text(date));
}
}
area的Reducer函数
package kpi.area;
import kpi.Summary;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class DailyAreaReducer extends Reducer<Text, Text, Summary, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//统计每个省的数量,需要把时间取出来,要不然后面不知道哪一天的数据
int num = 0;
String date = null;//这个不是按照省份关键字收集的嘛?
for (Text value : values) {
if(num == 0){
date = value.toString();
}
num ++;
}
Summary dailyArea = new Summary(null, date, "dailyArea", num + "", key.toString());
context.write(dailyArea, null);
}
}
7)统计高频访问商品
高频访问商品:fsg
fsg的主要函数
package kpi.fsg;
import kpi.Summary;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import outputFormat.MysqlOutputFormat;
import java.io.IOException;
public class FrequentlySearchGoods {
public static final int N = 5;//要统计前五的商品
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration(true);
Job job = Job.getInstance(configuration);
job.setJobName("FSG");
job.setJarByClass(FrequentlySearchGoods.class);
job.addArchiveToClassPath(new Path("/lib/mysql/mysql-connector-java-5.1.47.jar"));
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(FSGMapper.class);
job.setMapOutputKeyClass(Text.class);//应该是商品名称
job.setMapOutputValueClass(IntWritable.class);//应该是商品数量
job.setReducerClass(FSGReducer.class);
job.setOutputKeyClass(Summary.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(MysqlOutputFormat.class);
job.waitForCompletion(true);
}
}
fsg的Mapper函数
package kpi.fsg;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FSGMapper extends Mapper<Object, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 106.80.0.169---中国:重庆市:未知---2023-01-04 11:14:31---https://mall.kayla.com/item/2382.html---Android
String line = value.toString();
String[] parts = line.split("---");
//获取到时间2023-01-04
String date = parts[2].split(" ")[0];
/*假设我们要寻找的地址如下
106.80.0.169---中国:重庆市:未知---2023-01-04 11:14:31---https://mall.kayla.com/item/2382.html---Android
*/
String url = parts[3];
//如果页面中不包含这个字符 mall.kayla.com/item/ ,那么不是我们需要统计的数据
if(!url.contains("mall.kayla.com/item/")){
return;
}
int index = url.lastIndexOf("/");
int end = url.lastIndexOf(".");
String goodsId = url.substring(index + 1, end);//商品编号
context.write(new Text(date + ":" + goodsId), one);//2023-01-04:商品id,1
}
}
fsg的Reducer函数
package kpi.fsg;
import kpi.Summary;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;
public class FSGReducer extends Reducer<Text, IntWritable, Summary, NullWritable> {
/**
* 没有重写的TreeMap,是不允许存储相同的key的,因此会出现覆盖的情况
* 如果两个商品的访问量刚刚好相同,那么便会出现覆盖的情况,造成失真
* 所以要重写TreeMap,按照自定义的顺序进行排序
*/
private TreeMap<Integer, String> fsg = new TreeMap<>(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
if(o1.intValue() >= o2.intValue()){
return 1;
}
return -1;
}
});
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//key:2022-02-17:475,value:商品的次数 ---- 我们取前五
//确保reducer只有一个
//当数据大于5时,将最前面的去掉
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
fsg.put(sum, key.toString());//放入这个TreeMap里,将键值对反过来存入TreeMap中,TreeMap按照键值会自动排序
if(fsg.size() > FrequentlySearchGoods.N){
//删除第一组键值对
fsg.pollFirstEntry();
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (int i = 0; i < FrequentlySearchGoods.N; i++) {
//循环N次,每次poll出最后一个,也就是最大值
//TreeMap底层是红黑树,它是一棵自平衡的二叉搜索树,能找到最大和最小的数
Map.Entry<Integer, String> integerStringEntry = fsg.pollLastEntry();
Integer number = integerStringEntry.getKey();//获取键
String text = integerStringEntry.getValue();//获取值
String[] split = text.split(":");
String date = split[0];
String goodsId = split[1];
Summary searchGoods = new Summary(null, date, "FrequentlySearchGoods", number + "", goodsId);//日期date这天编号为goodsId的商品的数量为number
context.write(searchGoods, null);
}
}
}
8)自定义的Summary类及其写入到数据库
Summary类的写法
package com.cskaoyan.kpi;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
public class Summary implements WritableComparable<Summary> {
//下面是5个成员变量
private Integer id;
private String date;
private String attribute;
private String value;
private String addition;
//定义工具类获取或修改Summary类的成员变量
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getAttribute() {
return attribute;
}
public void setAttribute(String attribute) {
this.attribute = attribute;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getAddition() {
return addition;
}
public void setAddition(String addition) {
this.addition = addition;
}
//定义构造函数
public Summary(Integer id, String date, String attribute, String value, String addition) {
this.id = id; //数据库里面查看数据的索引
this.date = date; //日期
this.attribute = attribute; //具体是哪个属性:PV数?用户IP数?当天热门商品前五
this.value = value; //对应属性的值
this.addition = addition; //补充信息
}
public Summary() {
}
//因为我最后要把数据写入到数据库里,为了保持集群中的持久化操作,必须定义Summary类的读、写操作和比较方法,这样才知道某个节点是否已经存储了那条数据
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(date);
out.writeUTF(attribute);
out.writeUTF(value);
out.writeUTF(addition);
}
@Override
public void readFields(DataInput in) throws IOException {
this.date = in.readUTF();
this.attribute = in.readUTF();
this.value = in.readUTF();
this.addition = in.readUTF();
}
@Override
public int compareTo(Summary o) {
try {
long t1 = new SimpleDateFormat("yyyy-MM-dd").parse(this.date).getTime();//把String型的字符串转换成特定格式的date类型,然后用getTime()再转化为毫秒数
long t2 = new SimpleDateFormat("yyyy-MM-dd").parse(o.getDate()).getTime();
if(t1 > t2){
return 1;//说明当前Summary的时间更晚
}else if(t1 < t2){
return -1;//说明比较对象O的时间更晚
}
} catch (ParseException e) {
e.printStackTrace();
}
return 0;
}
}
JDBC的定义
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class JDBCUtil {
public static Connection getConnection() {
Connection connection = null;
try {
Class.forName("com.mysql.jdbc.Driver");//没有用到Properties魔术值
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/hadoop?characterEncoding=UTF-8", "root", "123456");
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return connection;
}
}
自定义的Format(告诉hadoop怎么把数据写入数据库)
import com.cskaoyan.kpi.Summary;
import com.cskaoyan.kpi.util.JDBCUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MysqlOutputFormat extends OutputFormat<Summary, NullWritable> {
//定义了一个内部类,这个类是OutputFormat类必须要实现抽象类RecordWriter的具体类
//而这个RecordWriter类必须实现 write()、close()方法
protected static class MysqlRecordWriter extends RecordWriter<Summary, NullWritable>{
private Connection connection = null;
//构造方法
public MysqlRecordWriter(){
connection = JDBCUtil.getConnection();
}
@Override
public void write(Summary summary, NullWritable value) throws IOException, InterruptedException {
PreparedStatement preparedStatement = null;
String sql = "insert into summary values (null,?,?,?,?)";
try {
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, summary.getDate());
preparedStatement.setString(2, summary.getAttribute());
preparedStatement.setString(3, summary.getValue());
preparedStatement.setString(4, summary.getAddition());
preparedStatement.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}finally {
if(preparedStatement != null){
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if(connection != null){
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}//MysqlRecordWriter
@Override
public RecordWriter<Summary, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
return new MysqlRecordWriter();
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
}
private FileOutputCommitter committer = null;
public static Path getOutputPath(JobContext job) {
String name = job.getConfiguration().get("mapred.output.dir");
return name == null?null:new Path(name);
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}
}
9)在Ubuntu安装MySQL
apt-get update <!---先更新软件---->
apt install mysql-server <!---选择yes---->
mysql_secure_installation <!---开始配置---->
root@root01:~# mysql_secure_installation
Securing the MySQL server deployment.
Connecting to MySQL using a blank password.
VALIDATE PASSWORD PLUGIN can be used to test passwords
and improve security. It checks the strength of password
and allows the users to set only those passwords which are
secure enough. Would you like to setup VALIDATE PASSWORD plugin?
Press y|Y for Yes, any other key for No: n
Please set the password for root here.
New password: <!---设置密码123456--->
Re-enter new password:
By default, a MySQL installation has an anonymous user,
allowing anyone to log into MySQL without having to have
a user account created for them. This is intended only for
testing, and to make the installation go a bit smoother.
You should remove them before moving into a production
environment.
Remove anonymous users? (Press y|Y for Yes, any other key for No) : y
Success.
Normally, root should only be allowed to connect from
'localhost'. This ensures that someone cannot guess at
the root password from the network.
Disallow root login remotely? (Press y|Y for Yes, any other key for No) : n
... skipping.
By default, MySQL comes with a database named 'test' that
anyone can access. This is also intended only for testing,
and should be removed before moving into a production
environment.
Remove test database and access to it? (Press y|Y for Yes, any other key for No) : y
- Dropping test database...
Success.
- Removing privileges on test database...
Success.
Reloading the privilege tables will ensure that all changes
made so far will take effect immediately.
Reload privilege tables now? (Press y|Y for Yes, any other key for No) : y
Success.
All done!
root@root01:~# mysql <!---此时不需要密码就可随意访问,接下来设置密码---->
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 4
Server version: 5.7.41-0ubuntu0.18.04.1 (Ubuntu)
Copyright (c) 2000, 2023, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> use mysql
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
update user set authentication_string=password(123456) where user='root';
update user set plugin="mysql_native_password";
flush privileges;
exit;
重新登录mysql
mysql -u root -p
use mysql;
update user set authentication_string=password("你的密码") where user='root';
#例如,update user set authentication_string = password(123456) where user = 'root';
update user set plugin="mysql_native_password";
flush privileges;
exit;
#在数据库中创建表
create database hadoop charset gbk;
use hadoop;
create table summary (
id int primary key auto_increment,
day varchar(50),
attribute varchar(50),
val varchar(50),
addition varchar(50)
);
把mysql的驱动mysql-connector-java-5.1.47.jar放入到集群中
hdfs dfs -mkdir /lib/mysql
hdfs dfs -put mysql-connector-java-5.1.47.jar /lib/mysql <!---因为所有的MR程序都要用到这个驱动---->
安装了MySQL后的内存大小
10)结果
我把每天的数据都放入集群中
cd /home
<!---把文件拖进到/home目录里,每次放一个--->
在hdfs中创建文件夹存放数据
hdfs dfs -mkdir /data
hdfs dfs -put access0101.log /data
把生成的MR的jar包放入到/home目录下,分别用MR中的
kpi.pv.DailyPV、kpi.area.DailyArea、kpi.fsg.FrequentlySearchGoods方法,处理数据并存入数据库。
数据库里会自然地生成 ①7天中每天的PV总数;②所有城市的访问量;③7天中排名前5的商品id
cd /home
#先转化地址
hadoop jar bigData-1.0-SNAPSHOT.jar etl.ETL /data /etl
#把生成的数据存到本地
hdfs dfs -get /etl/part-r-00000 /home/data0101.txt
#运行MapReduce中的DailyPV方法
hadoop jar bigData-1.0-SNAPSHOT.jar kpi.pv.DailyPV /etl
#运行MapReduce中的DailyArea方法
hadoop jar bigData-1.0-SNAPSHOT.jar kpi.area.DailyArea /etl
#运行MapReduce中的DailyArea方法
hadoop jar bigData-1.0-SNAPSHOT.jar kpi.fsg.FrequentlySearchGoods /etl
use hadoop;
select * from summary;
以及统计数据库数据的操作,分别查看平均PV数、热门商品前五、和地域数量
常见问题解决
1、如果,节点角色显示不全,那么需要格式化
1、关闭dfs和yarn
stop-yarn.sh stop-dfs.sh
2、删除除了NameNode节点以外的其他所有节点的:临时文件、datanode和namenode的存储文件夹信息
cd /opt/hadoop/ rm -rf tmp <!-- 这个要看你自己在core-site.xml中配置在哪个文件夹下,默认是上面这个文件夹的tmp文件夹 --> cd /var/bigdata/ rm -rf hadoop/
3、仅在NameNode中格式化NameNode,当出现successfully时才算成功
hdfs namenode -format
2、解决不显示ResourceManager问题
原因:默认ResourceManager启动时要用到8031端口
两种原因可能导致该端口已被占用
1)就是该主机上的其他进程占用了该端口 解决一:释放端口号,可参考该博文找到占用端口的进程 https://www.linuxprobe.com/linux-port-check.html 如果不显示说明没有进程占用这个端口号 或者 不是root权限 解决二:重新分配端口号,参考这个可以更改要给予的端口号 https://blog.csdn.net/web15085599741/article/details/123935253 2)在错误的机器上,启动了yarn 应该在配置为ResourceManager的机器节点上,启动yarn
3、阿里云服务器无法访问hdfs的可视化界面
参考链接
我自己配置了一个安全组,然后把两个实例的安全组都替换为自己创建的。
自己的安全组借用了原来安全组的配置,只是新增加了50070和8088端口
4、NameNode下没有DataNode
先关闭hdfs
stop-dfs.sh
参考博客
DataNode重启失败的原因排查
启动hadoop 2.6遇到的datanode启动不了
2023-03-22 23:19:51,390 WARN org.apache.hadoop.hdfs.server.common.Storage: java.io.IOException: Incompatible clusterIDs in /var/bigdata/hadoop/distribute/dfs/data: namenode clusterID = CID-7703974b-603d-4b98-b1c6-1de62d825141; datanode clusterID = CID-f5afb25c-662d-412c-bc33-be4574a19f8d
推测原因为namenode重新格式化后clusterID发生变化,而datanode的clusterID没有同步更新导致
然后重新启动就解决了
start-dfs.sh
5、停留在job running
参考链接:
Hadoop中解决INFO ipc.Client: Retrying connect to server的问题
6、服务器内存变小
#使用top命令查看占内存的进程有哪些
#把%CPU列占比高的进程找到
kill 进程号
有可能会把集群的进程也杀死,此时就关闭集群,再reboot,然后重新连接服务器。
但分析我的两个服务器,主要是本来内存也小,而集群放的数据量又比较大,运行时就很占内存。
7、写入数据有误
可能原因:
1、没有设置mysql的密码。漏掉了重新设置mysql密码的操作
2、插入成功了,但是yarn上报这个错误(因为看数据库是有数据的),可能是它驱动MySQL数据库有一定的时间,通过jvm多次尝试后最终还是成功的。如果最后确实失败,自己多次执行jar包,也会成功