Skip to content

Commit

Permalink
!3 mapReduce手机号文件解析Demo提交
Browse files Browse the repository at this point in the history
* 手机号excel文件校验并入库--map 测试
* 手机号excel文件校验--map reduce 测试
  • Loading branch information
jcwang812 authored and open-snail committed Jul 1, 2024
1 parent ccfdc14 commit 53306f0
Show file tree
Hide file tree
Showing 11 changed files with 472 additions and 1 deletion.
10 changes: 9 additions & 1 deletion docs/demo.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,12 @@ CREATE TABLE fail_order
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`)
);
);

-- 手机号表
CREATE TABLE `phone_number` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
`phone_number` varchar(20) NOT NULL DEFAULT '' COMMENT '手机号',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='手机号表'
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@
<artifactId>okhttp</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>

<build>
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/example/snailjob/bo/PhoneNumberBo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.example.snailjob.bo;

import com.alibaba.excel.annotation.ExcelProperty;
import lombok.Data;

/**
* excel表格手机号BO
*
* @author JiChenWang
* @since 2024/6/27 20:28
*/
@Data
public class PhoneNumberBo {

@ExcelProperty(value = "手机号码", index = 0)
private String phoneNumber;

}
40 changes: 40 additions & 0 deletions src/main/java/com/example/snailjob/bo/PhoneNumberCheckBo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.example.snailjob.bo;

import com.example.snailjob.po.PhoneNumberPo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.List;

/**
* 手机号检测BO
*
* @author JiChenWang
* @since 2024/6/27 20:50
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PhoneNumberCheckBo {

@Schema(description = "检测总条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long checkTotalNum = 0L;

@Schema(description = "检测失败条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long checkErrorNum = 0L;

@Schema(description = "检测成功条数", accessMode = Schema.AccessMode.READ_WRITE)
private Long checkSuccessNum = 0L;

@Schema(description = "检测失败临时的数据", accessMode = Schema.AccessMode.READ_WRITE)
private List<String> checkErrorPhoneNumberList = new ArrayList<>();

@Schema(description = "检测成功临时的数据", accessMode = Schema.AccessMode.READ_WRITE)
private List<PhoneNumberPo> checkSuccessPhoneNumberList = new ArrayList<>();

}
15 changes: 15 additions & 0 deletions src/main/java/com/example/snailjob/dao/PhoneNumberBaseMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.example.snailjob.dao;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.snailjob.po.PhoneNumberPo;
import org.springframework.stereotype.Repository;

/**
* 手机号mapper
*
* @author JiChenWang
* @since 2024/6/30 11:55
*/
@Repository
public interface PhoneNumberBaseMapper extends BaseMapper<PhoneNumberPo> {
}
34 changes: 34 additions & 0 deletions src/main/java/com/example/snailjob/dao/PhoneNumberDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.example.snailjob.dao;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.snailjob.po.PhoneNumberPo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

/**
* TODO
*
* @author JiChenWang
* @since 2024/6/30 11:58
*/
@Service
public class PhoneNumberDao extends ServiceImpl<PhoneNumberBaseMapper, PhoneNumberPo> {

@Autowired
private PhoneNumberBaseMapper phoneNumberBaseMapper;

/**
* 批量保存手机号信息
*
* @param phoneNumberPoList 手机号po列表
* @return Boolean 保存成功标识:true-成功、false-失败
* @author JichenWang
* @since 2024/6/30 12:03
*/
public Boolean insertBatch (List<PhoneNumberPo> phoneNumberPoList) {
return this.saveBatch(phoneNumberPoList);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.example.snailjob.job;

import cn.hutool.core.util.ObjectUtil;
import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.alibaba.excel.EasyExcel;
import com.example.snailjob.bo.PhoneNumberBo;
import com.example.snailjob.bo.PhoneNumberCheckBo;
import com.example.snailjob.dao.PhoneNumberDao;
import com.example.snailjob.listener.PhoneNumberExcelListener;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.InputStream;
import java.util.List;

/**
* 解析手机号excel文件,并将正确的手机号分片入库
*
* @author JiChenWang
* @since 2024/6/30 10:37
*/
@Slf4j
@Component
@JobExecutor(name = "testExcelAnalyseMapJobExecutor")
public class TestExcelAnalyseMapJobExecutor {

private final Integer BATCH_SIZE = 100;

@Autowired
private PhoneNumberDao phoneNumberDao;

/**
* 读取手机号文件总行数,并进行分组
* 比如文档中的手机号总量为307条,每100条一个分组,分组结果为[{0,99}, {100, 199}, {200,299}, {300, 307}]
*
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/30 10:48
*/
@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
List<List<Long>> ranges = null;
// 先获取文件总行数,便于分组
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, true, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync();

// 设置区间范围
ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE);
} catch (Exception e) {
log.error("文件读取异常", e.getMessage());
}
return mapHandler.doMap(ranges, "MONTH_MAP");

}

/**
*
*
* @param mapArgs
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/30 11:05
*/
@MapExecutor(taskName = "MONTH_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) {
// 获取本次要处理的区间
final List<Integer> mapResult = (List<Integer>) mapArgs.getMapResult();
log.info("本次要处理的区间为:{}", mapResult);

// 按照处理区间,去读取数据
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, false, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(mapResult.get(0) + 1).doReadSync();
} catch (Exception e) {
log.error("文件读取异常:", e.getMessage());
}

// 如果正确手机号不为空,则入库
if (ObjectUtil.isNotEmpty(phoneNumberCheckBo.getCheckSuccessPhoneNumberList())) {
phoneNumberDao.insertBatch(phoneNumberCheckBo.getCheckSuccessPhoneNumberList());
}

return ExecuteResult.success(phoneNumberCheckBo.getCheckSuccessNum());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.example.snailjob.job;

import com.aizuda.snailjob.client.job.core.MapHandler;
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MapExecutor;
import com.aizuda.snailjob.client.job.core.annotation.MergeReduceExecutor;
import com.aizuda.snailjob.client.job.core.annotation.ReduceExecutor;
import com.aizuda.snailjob.client.job.core.dto.MapArgs;
import com.aizuda.snailjob.client.job.core.dto.MergeReduceArgs;
import com.aizuda.snailjob.client.job.core.dto.ReduceArgs;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.alibaba.excel.EasyExcel;
import com.alibaba.fastjson.JSONArray;
import com.example.snailjob.bo.PhoneNumberBo;
import com.example.snailjob.bo.PhoneNumberCheckBo;
import com.example.snailjob.listener.PhoneNumberExcelListener;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

/**
* 解析校验Excel中的手机号,统计出错手机号数量,并返回错误手机号详情
*
* @author JichenWang
* @since 2024/6/27 19:52
*/
@Slf4j
@Component
@JobExecutor(name = "TestExcelAnalyseMapReduceJobExecutor")
public class TestExcelAnalyseMapReduceJobExecutor {

private final Integer BATCH_SIZE = 100;

/**
* 处理手机号文件信息,将文档中的手机号进行分组
* 比如文档中的手机号总量为307条,每100条一个分组,分组结果为[{0,99}, {100, 199}, {200,299}, {300, 307}]
*
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/29 14:03
*/
@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
List<List<Long>> ranges = null;
// 先获取文件总行数,便于分组
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, true, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(1).doReadSync();

// 设置区间范围
ranges = TestMapReduceJobExecutor.doSharding(0L, phoneNumberCheckBo.getCheckTotalNum(), BATCH_SIZE);
} catch (Exception e) {
log.error("文件读取异常", e.getMessage());
}
return mapHandler.doMap(ranges, "MONTH_MAP");
}

/**
* 处理每个分组内容,如读取{0,99}区间的手机号,并解析
*
* @return ExecuteResult
* @author JichenWang
* @since 2024/6/29 14:04
*/
@MapExecutor(taskName = "MONTH_MAP")
public ExecuteResult monthMapExecute(MapArgs mapArgs) {
// 获取本次要处理的区间
final List<Integer> mapResult = (List<Integer>) mapArgs.getMapResult();
log.info("本次要处理的区间为:{}", mapResult);

// 按照处理区间,去读取数据
final PhoneNumberCheckBo phoneNumberCheckBo = new PhoneNumberCheckBo();
try {
@Cleanup InputStream numberInputStream = getClass().getClassLoader().getResourceAsStream("doc/number.xlsx");
PhoneNumberExcelListener phoneNumberExcelListener = new PhoneNumberExcelListener(phoneNumberCheckBo, false, BATCH_SIZE);
EasyExcel.read(numberInputStream, PhoneNumberBo.class, phoneNumberExcelListener).sheet().headRowNumber(mapResult.get(0) + 1).doReadSync();
} catch (Exception e) {
log.error("文件读取异常:", e.getMessage());
}

return ExecuteResult.success(phoneNumberCheckBo);
}


@ReduceExecutor
public ExecuteResult reduceExecute(ReduceArgs mapReduceArgs) {
log.info("WJC Test reduceExecute, 参数为:{}", mapReduceArgs.getMapResult());
final PhoneNumberCheckBo phoneNumberCheckBo = this.buildGatherPhoneNumberCheckBo(mapReduceArgs.getMapResult().toString());
return ExecuteResult.success(phoneNumberCheckBo);
}

/**
* 当只有一个reduce任务时无此执行器
*/
@MergeReduceExecutor
public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
final PhoneNumberCheckBo phoneNumberCheckBo = this.buildGatherPhoneNumberCheckBo(mergeReduceArgs.getReduces().toString());
log.info("WJC 最终检测结果为:{}", phoneNumberCheckBo);
return ExecuteResult.success(phoneNumberCheckBo);
}

/**
* 构造汇总手机号校验结果BO
*
* @param phoneNumberCheckBoStr 手机号校验BO字符串
* @return PhoneNumberCheckBo 汇总手机号校验结果BO
* @author JichenWang
* @since 2024/6/29 14:24
*/
private PhoneNumberCheckBo buildGatherPhoneNumberCheckBo(String phoneNumberCheckBoStr) {
final List<PhoneNumberCheckBo> phoneNumberCheckBoList = JSONArray.parseArray(phoneNumberCheckBoStr, PhoneNumberCheckBo.class);
// 获取校验总数
final long checkTotalNum = phoneNumberCheckBoList.get(0).getCheckTotalNum();
// 汇总校验失败数量
final long checkErrorNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckErrorNum).sum();
// 汇总校验成功数量
final long checkSuccessNum = phoneNumberCheckBoList.stream().mapToLong(PhoneNumberCheckBo::getCheckSuccessNum).sum();
// 汇总错误手机号
final List<String> errorPhoneNumberList = new ArrayList<>();
phoneNumberCheckBoList.forEach(item -> errorPhoneNumberList.addAll(item.getCheckErrorPhoneNumberList()));

// 汇总手机号校验结果
return PhoneNumberCheckBo.builder().checkTotalNum(checkTotalNum).checkErrorNum(checkErrorNum).checkSuccessNum(checkSuccessNum).checkErrorPhoneNumberList(errorPhoneNumberList).build();
}

}
Loading

0 comments on commit 53306f0

Please sign in to comment.