博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Reduce Side Join实现
阅读量:6247 次
发布时间:2019-06-22

本文共 9347 字,大约阅读时间需要 31 分钟。

关于reduce边join,其最重要的是使用MultipleInputs.addInputPath这个api对不同的表使用不同的Map,然后在每个Map里做一下该表的标识,最后到了Reduce端再根据标识区分对应的表!

Reduce Side Join Example

User and comment join

In thisexample, we’ll be using theusers and comments tables from the StackOverflow dataset. Storing data in this matter makessense, as storingrepetitive user data witheach comment is unnecessary. Thiswould also makeupdating user information diffi‐ cult. However,having disjoint data sets posesproblems when it comes to associating a comment with the user who wroteit. Through the use of a reduceside join, thesetwo data sets canbe merged together using the userID as the foreign key. In this example, we’ll perform an inner, outer, and antijoin. The choice of which join to execute is set during job configuration.

Hadoop supportsthe ability to use multipleinput data typesat once, allowingyou to create a mapper classand input formatfor each inputsplit from different data sources. This is extremely helpful, because you don’t have to code logic for two different data inputs in the samemap implementation. In the following example, two mapperclasses are created: one for the user data and one for the comments. Each mapper classoutputs the user ID as the foreignkey, and the entire record as the value along with a single character to flag whichrecord came fromwhat set. Thereducer then copiesall values for eachgroup in memory, keepingtrack of whichrecord came fromwhat data set.The records are then joined togetherand output.

 

The following descriptions of eachcode section explainthe solution to the problem.

Problem: Given a set of user information and a list of user’s comments, enrich each comment with the information about the userwho created thecomment.

 

Drivercode.The job configurationis slightly different from the standard configuration due to the user of themultiple input utility. We also set the join type in the jobconfig‐ uration to args[2] so it can be used in the reducer. The relevant piece of the drivercode to use the MultipleInput follows:

...

// Use MultipleInputs to set which inputuses what mapper

// This will keep parsingof each dataset separate froma logical standpoint

// The firsttwo elements of theargs arrayare the two inputs 

MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,UserJoinMapper.class);

MultipleInputs.addInputPath(job,newPath(args[1]), TextInputFormat.class, CommentJoinMapper.class);

 

job.getConfiguration()..set("join.type", args[2]);

...

 

User mappercode.This mapper parseseach input lineof user dataXML. It grabs theuser ID associated with each record and outputs it along with the entire input  value. It prepends the letter A in front of theentire value. This allows the reducer to know which values came from what data  set.

public static class UserJoinMapper extendsMapper<Object, Text, Text, Text> {

 

private Text outkey =newText();

private Text outvalue =newText();

 

public void map(Object key, Text value, Context context) throwsIOException, InterruptedException {

 

// Parse the input stringinto a nice map

Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());

 

String userId = parsed.get("Id");

 

// The foreign join keyis the userID

outkey.set(userId);

 

// Flag this record for the reducerand then outputoutvalue.set("A" + value.toString()); context.write(outkey, outvalue);

}

}

When you output the value from the map side, the entire record doesn’t have to be sent. This is an opportunity to optimize the join by keepingonly the fields of data you want to join together. It requiresmore pro‐ cessing on the map side, but is worthit in the long run. Also, sincethe foreign key is in the map output key, you don’t need to keep that in the value, either.

 

 

Comment mapper code.This mapper parseseach input line of commentXML. Very sim‐ ilar to the UserJoinMapper,it too grabs the user ID associated with each record and outputs it along with the entire inputvalue. The only different here is that the XML attribute UserId representsthe user that posted to comment, where as Id in theuser  data set is the user ID. Here, this mapper prepends the letter B in front ofthe entire value.

public static class CommentJoinMapper extends Mapper<Object, Text, Text, Text> {

 

private Text outkey =newText();

private Text outvalue =newText();

 

public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

 

Map<String, String> parsed = transformXmlToMap(value.toString());

 

// The foreign join keyis the userID

outkey.set( parsed.get("UserId"));

 

// Flag this record for the reducerand then outputoutvalue.set("B" + value.toString()); context.write(outkey, outvalue);

}

}

 

Reducer code.The reducer code iterates through all thevalues of each group and looks atwhat each record is tagged with and then puts the record in one of two lists.After all values are binned in either list, the actual join logic is executedusing the two lists. The join logic differs slightly based on the type of join,but always involves iterating through both lists and writing to the Context object.The type of join is pulled from the job configuration in the setup method. Let’s look at the main reduce method before looking at the join logic.

public static class UserJoinReducer extendsReducer<Text, Text, Text, Text> {

 

private staticfinal Text EMPTY_TEXT = Text("");

private Text tmp =newText();

private ArrayList<Text> listA =newArrayList<Text>();

private ArrayList<Text> listB =newArrayList<Text>();

private String joinType =null;

 

public void setup(Context context) {

// Get the type of join fromour configuration

joinType=context.getConfiguration().get("join.type");

}

 

public void reduce(Text key, Iterable<Text> values, Context context)

throws IOException, InterruptedException {

 

// Clear ourlists listA.clear(); listB.clear();

 

// iterate throughall our values,binning each recordbased on what

// it was tagged with.Make sure to remove thetag!

while(values.hasNext()) { tmp=values.next();

if (tmp.charAt(0) == 'A') {

listA.add(new Text(tmp.toString().substring(1)));

} else if (tmp.charAt('0') == 'B') {

listB.add(new Text(tmp.toString().substring(1)));

}

}

 

// Execute our join logicnow that the lists are filled

executeJoinLogic(context);

}

 

private void executeJoinLogic(Context context)

throws IOException, InterruptedException {

...

}

The input data types tothe reducer are two Text objects. The input key isthe foreign join key, which in this example is the user’s ID. The input values associated with the foreign key contain one record from the “users” data set tagged with ‘B’, as well as all the comments the user posted tagged with ‘B’. Any type of data formatting you would want toperform should be done here prior to outputting. For simplicity, the raw XML value from the left data set (users)is output as the key and the raw XML value from the rightdata set (comments) is output as the value.

Next, let’s look at each of the join types. First up is an inner join. If both the lists are not empty, simply performtwo nested forloops and joineach of thevalues together.

if (joinType.equalsIgnoreCase("inner")) {

// If both lists are not empty,join A with B

if (!listA.isEmpty() && !listB.isEmpty()) {

for (Text A : listA) {

for(Text B : listB) { context.write(A, B);

}

}

}

}...

For aleft outer join,if the right list is not empty, join A with B.If the right list is empty, outputeach record of A with an empty string.

... else if(joinType.equalsIgnoreCase("leftouter")) {

// For each entry in A,

for (Text A : listA) {

// If list B is not empty,join A andB

if (!listB.isEmpty()) {

for(Text B : listB) { context.write(A, B);

}

}else{

// Else, outputA by itself

context.write(A, EMPTY_TEXT);

}

}

}...

A rightouter join is very similar, except switching from the check for empty elements fromBto A. If the left list is empty, write records from B withan empty output key.

...else if (joinType.equalsIgnoreCase("rightouter")) {

// For each entry in B,

for (Text B : listB) {

// If list A is not empty,join A andB

if (!listA.isEmpty()) {

for(Text A : listA) { context.write(A, B);

}

} else {

// Else, outputB by itself

context.write(EMPTY_TEXT, B);

}

}

}...

A fullouter join is more complex, in that we want to keep allrecords, ensuring thatwe join records whereappropriate. If list A is not empty, then for everyelement inA, join withB whenthe B listis not empty, or output A by itself. IfA isempty, then just output B.

... else if (joinType.equalsIgnoreCase("fullouter")) {

// If list A is not empty

if (!listA.isEmpty()) {

// For each entry in A

for (Text A : listA) {

// If list B is not empty,join A with B

if (!listB.isEmpty()) {

for(Text B : listB) { context.write(A, B);

}

}else {

// Else, outputA by itself

context.write(A, EMPTY_TEXT);

}

}

} else {

// If list A is empty, just output B

for (Text B : listB) { context.write(EMPTY_TEXT, B);

}

}

}...

For anantijoin, if at least one of the lists is empty, output the recordsfrom the non- empty list with an empty Text object.

... else if(joinType.equalsIgnoreCase("anti")) {

// If list A is empty and B is empty or vice versa

if (listA.isEmpty() ^ listB.isEmpty()) {

 

// Iterate both A and B with null values

// The previous XOR checkwill make sure exactly one of

// these lists is emptyand therefore the list will be skipped

for (Text A : listA) { context.write(A, EMPTY_TEXT);

}

 

for (Text B : listB) { context.write(EMPTY_TEXT, B);

 

}

}

转载地址:http://akmia.baihongyu.com/

你可能感兴趣的文章
Java内存分配全面浅析
查看>>
32、OSPF在帧中继中不同网络类型配置总结
查看>>
git 之CAfile问题
查看>>
EBS exit_form()
查看>>
手动安装Jenkins插件
查看>>
共享主机
查看>>
阿里巴巴Dubbo实现的源码分析
查看>>
exe4j,
查看>>
Mysql查看执行计划
查看>>
SCDPM 2010系列之一——安装
查看>>
cocos2dx学习
查看>>
http加密访问应用
查看>>
vlayout
查看>>
必读 | 什么时候开始准备2019年下半年的考试?
查看>>
JDK安装说明
查看>>
iftop-流量监控安装(脚本)
查看>>
Windows Server2008通过命令行方式添加防火墙规则
查看>>
我的友情链接
查看>>
2013年十大IT趋势预测
查看>>
用PySpider搜集2017年高校招生章程
查看>>