引用请声明原文:http://blog.csdn.net/duck_genuine/article/details/9155705
由于引用数据以hash的方式放在不同的文件里需要将其合并排序写到一个文件。数据量暂时是有几千万级别。
文件的每行是一条json格式的记录,格式如下:
{ "_id" : { "$oid" : "51ace243bb15094b6c40ada5" }, "count" :1, "domain" : "qq.com", "vid" : "92312592" }
将这个过程的实现做成一个类似MapReduce的过程:
mapSplit--->shuffler-->sort-->meger
然后将处理好的文件加载到内存,放到list集合中,再使用二分查找方式,不使用map,是因为map占用的内存比实际数据更大。
- 先看一下程序的入口代码
package com.wole.indexing.refer;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author yzygenuine
*
* 整合map,shuffler,merge程序,定时将数据处理写到指定文件中
*/
public class MapReduceDriver {
private static final Logger logger = LoggerFactory.getLogger(MapReduceDriver.class);
private static final Logger mail_log = LoggerFactory.getLogger("mail_log");
public static void main(String[] args) {
logger.info("starting.....");
long start=System.currentTimeMillis();
String inputDir = args[0];
File dir = new File(inputDir);
if (!dir.exists()) {
logger.warn("目录不存在");
System.exit(-1);
}
Collection<File> sourceFile = FileUtils.listFiles(dir, new String[] { "json" }, false);
logger.info("sourceFiles size:" + sourceFile.size());
logger.info("开始mapSplit");
for (File f : sourceFile) {
logger.info("mapSplit:"+f.getAbsolutePath());
MapSplit map = new MapSplit();
map.setInputFile(f);
map.setOutputFile(new File(f.getAbsolutePath() + ".map"));
map.setBatchSize(5000);
map.init();
map.map();
}
logger.info("完成mapsplit");
Collection<File> mapFile = FileUtils.listFiles(dir, new String[] { "map" }, false);
for (File f : mapFile) {
logger.info("shuffler:"+f.getAbsolutePath());
Shuffler shuffler=new Shuffler();
shuffler.toShuffler(f);
}
logger.info("完成shuffler");
logger.info("开始merge.......");
String mergerFile="merger_refer";
MergeFile merge=new MergeFile(new File(dir,mergerFile));
Collection<File> shufflerFile = FileUtils.listFiles(dir, new String[] { "shuffler" }, false);
try {
merge.merge(shufflerFile);
} catch (IOException e) {
logger.error("",e);
}
logger.info("merge结束.......");
long end = System.currentTimeMillis();
long total=(end-start)/1000;
logger.info("总共耗时:"+total);
logger.info("end!");
}
}
- mapSplit的实现:
package com.wole.indexing.refer;
import gnu.trove.map.hash.TIntIntHashMap;
import gnu.trove.map.hash.TIntObjectHashMap;
import gnu.trove.procedure.TIntIntProcedure;
import gnu.trove.procedure.TIntObjectProcedure;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.chenlb.solr.indexing.DataSource;
import com.wole.indexing.util.Occupy;
/**
* @author yzygenuine
*
*/
public class MapSplit{
private static final Logger logger = LoggerFactory.getLogger(MapSplit.class);
private static final Logger mail_log = LoggerFactory.getLogger("mail_log");
private File inputFile;
private File outputFile;
private LineIterator lineIter;
private Map<String, Integer> domainMapNum = new HashMap<String, Integer>();
private int batchSize = 5000;
public boolean hasNext() {
return lineIter.hasNext();
}
public void init() {
logger.info("初始化");
try {
LineIterator lineIter = FileUtils.lineIterator(inputFile);
if (lineIter != null) {
this.lineIter = lineIter;
}
} catch (Exception e) {
logger.error("", e);
mail_log.info("from "+MapSplit.class.toString());
mail_log.error("文件是否配置有问题,读取失败");
throw new RuntimeException("文件是否配置有问题,读取失败");
}
domainMapNum.put("renren.com", 1);
domainMapNum.put("qq.com", 2);
}
private TIntObjectHashMap<TIntIntHashMap> vidMap = new TIntObjectHashMap<TIntIntHashMap>();
/**
* 统计总共加载的数据记录数
*/
private int count = 0;
private int bigDomainCount = 0;
public void nextData() {
while (lineIter.hasNext()) {
String line = lineIter.nextLine();
try {
JSONObject json = new JSONObject(line);
Integer obj_count = json.getInt("count");
String obj_domain = json.getString("domain");
// 处理domain,只取后缀,找出对应的num
int obj_vid = json.getInt("vid");
TIntIntHashMap domain_count_map = vidMap.get(obj_vid);
if (domain_count_map == null) {
domain_count_map = new TIntIntHashMap();
vidMap.put(obj_vid, domain_count_map);
}
// 查找并统计
int domainNum = 0;
Iterator<String> domainIter = domainMapNum.keySet().iterator();
while (domainIter.hasNext()) {
String key = domainIter.next();
if (obj_domain.indexOf(key) != -1) {
domainNum = domainMapNum.get(key);
break;
}
}
int domainCount = domain_count_map.get(domainNum);
domainCount += obj_count;
domain_count_map.put(domainNum, domainCount);
} catch (JSONException e) {
logger.error("", e);
}
count++;
if (count % batchSize == 0) {
break;
}
}
logger.info("count:" + count);
}
final Set<String> domainSet = new HashSet<String>();
public void map() {
while (hasNext()) {
nextData();
}
// 写入文件
try {
outputFile();
} catch (IOException e) {
logger.error("", e);
}
}
final static String separator = System.getProperty("line.separator");
private void outputFile() throws IOException {
final String split = ",";
final BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(outputFile));
vidMap.forEachEntry(new TIntObjectProcedure<TIntIntHashMap>() {
@Override
public boolean execute(int vid, TIntIntHashMap domain_count_map) {
final StringBuilder sb = new StringBuilder();
sb.append(vid);
sb.append(split);
domain_count_map.forEachEntry(new TIntIntProcedure() {
@Override
public boolean execute(int domainNum, int domain_count) {
sb.append(domainNum);
sb.append(":");
sb.append(domain_count);
sb.append(split);
return true;
}
});
sb.append(separator);
String line = sb.toString();
try {
out.write(line.getBytes("utf-8"), 0, line.length());
} catch (Exception e) {
logger.error("", e);
}
return true;
}
});
try {
out.close();
} catch (Exception e) {
logger.error("", e);
}
logger.info("写文件完成");
}
public void setInputFile(File inputFile) {
this.inputFile = inputFile;
}
public void setOutputFile(File outputFile) {
this.outputFile = outputFile;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public static void main(String[] args) {
String inputFile = args[0];
String outputFile = args[1] + ".map";
MapSplit map = new MapSplit();
map.setInputFile(new File(inputFile));
map.setOutputFile(new File(outputFile));
map.setBatchSize(5000);
map.init();
map.map();
}
}
- shuffler实现代码如下:
package com.wole.indexing.refer;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.dsa.Sort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.wole.indexing.util.Occupy;
/**
* @author yzygenuine
*中间结果排序
*/
public class Shuffler {
private static final Logger logger = LoggerFactory.getLogger(MapSplit.class);
private static final Logger mail_log = LoggerFactory.getLogger("mail_log");
public void toShuffler(File inputFile) {
LineIterator iter = null;
try {
iter = FileUtils.lineIterator(inputFile);
} catch (IOException e) {
logger.error("", e);
}
if (iter == null) {
return;
}
//读取文件到list集合
List<VidInfo> list = new ArrayList<VidInfo>();
while (iter.hasNext()) {
String line = (String) iter.next();
VidInfo vidInfo=VidInfo.lineToVidInfo(line);
list.add(vidInfo);
}
//将数组做排序
VidInfo[] array = new VidInfo[list.size()];
list.toArray(array);
Sor
聚类属于无监督学习。
聚类的算法有很多种,其可分为基于划分、层次、密度、网格及模型的聚类方法。
根据数据集的不同,需要采用不同的聚类算法和策略。
1. 选择聚类算法,所面临的常见问题又哪些?
1) 不同形状的数据集。不同形状的数据集,也需要采取不同的度量策略,或者不同的聚类算法。
2)不同的数据次序。相同数据集,但数据输入次序不同,也会造成聚类的结果的不同。
3)噪声。不同的算法,对噪声的敏感程度不同。
2. 在高维的欧式空间,什么是“维数灾难”?
在高维下,所有点对的距离都差不多(如欧式距离),或者是几乎任意两个向量都是正交(利用夹角进行进行度量),这样聚类就很困难。
3. 常见的聚类算法的策略有哪些?
1)层次或凝聚式聚类。采取合并的方式,将邻近点或簇合并成一个大簇。
2)点分配。每次遍历数据集,将数据分配到一个暂时适合的簇中,然后不断更新。
4. 层次聚类算法的复杂度是多少?
每次合并,都需计算出两个点对之间的距离,复杂度是O(n^2), 后续步骤的开销,分布正比与O((n-1)^2), O((n-2)^2)...,这样求和算下来,算法复杂度是O(n^3).
算法优化:采用优先队列/最小堆来优化计算。优先队列的构建,第一步需要计算出每两个点的距离,这个开销是O(N^2). 一般情况下,N个元素,单纯的优先队列的构建开销为O(N),若是N^2个距离值,则建堆的开销是O(N^2)。
第二步,合并,合并需要一个删除、计算和重新插入的过程。因为合并一个簇对,就需要更新N个元素,开销为O(N*logN)。总的开销为O((N^2) * logN).
所以,总的算法复杂度为O((N^2) * logN).
5. 欧式空间与非欧式空间下,常见的簇之间的距离度量有哪些?
欧式空间:
1)两个簇之间的质心之间的距离最小
2)两个簇中所有点之间的最短距离
3)两个簇之间所有点对的平均距离
4)将具有最小半径的两个簇进行合并, 簇的半径:簇内的点到质心的最大距离
5)将具有最小直径的两个簇进行合并,簇的直径:簇内任意两点间的最大距离
非欧式空间,簇的中心点定义,该点距离其他点的距离最近,如何计算?
1)该点到簇中其他所有点的距离之和(求和),1-范数
2)该点到簇中其他点的最大距离(最大值),无穷-范数
3)该点到簇中其他点的平方和(平方和),2-范数
6. k-means算法,k均值算法
点分配式的聚类算法。一般用于球形或凸集的数据集。
算法步骤如下:
1)初始化k个选择点作为最初的k个簇的中心
2)计算每个点分别到k个簇的中心,并将点分配到其距离最近的簇中
3)由分配的点集,分别更新每个簇的中心,然后回到2,继续算法,直到簇的中心变化小于某个阈值
7. k-means算法的两个问题?
1)初始化选择点;常用的方式是尽量选择距离比较远的点(方法:依次计算出与已确定的点的距离,并选择距离最大的点),或者首先采取层次聚类的方式找出k个簇
2)如何选取k值;k值选取不当,会导致的问题?当k的数目低于真实的簇的数目时,平均直径或其他分散度指标会快速上升
可以采用多次聚类,然后比较的方式。多次聚类,一般是采用1, 2, 4, 8...数列的方式,然后找到一个指标在v/2, v时,获取较好的效果,然后再使用二分法,在[v/2, v]之间找到最佳的k值。
8. CURE算法
使用场景:
任何形状的簇,如S形、环形等等,不需要满足正态分布,欧式空间,可以用于内存不足的情况
特征:
簇的表示不是采用质心,而是用一些代表点的集合来表示。
算法步骤:
1)初始化。抽取样本数据在内存中进行聚类,方法可以采用层次聚类的方式,形成簇之后,从每个簇中再选取一部分点作为簇的代表点,并且每个簇的代表点之间的距离尽量远。对每个代表点向质心移动一段距离,距离的计算方法:点的位置到簇中心的距离乘以一个固定的比例,如20%。
2)对簇进行合并。当两个簇的代表点之间足够近,那么就合并这两个簇,直到没有更足够接近的簇。
3)点分配。对所有点进行分配,即将点分配给与代表点最近的簇。
9. GRGPF算法
场景:
非欧式空间,可用于内存不足的情况(对数据抽样)
特征:
同时使用了层次聚类和点分配的的思想。
如何表示簇?
数据特征:簇包含点的数目,簇中心点,离中心点最近的一些点集和最远的一些点集,ROWSUM(p)即点p到簇中其他店的距离平方和。靠近中心的点集便于修改中心点的位置,而远离中心的点便于对簇进行合并。
簇的组织:类似B-树结构。首先,抽取样本点,然后做层次聚类,就形成了树T的结构。然后,从树T中选取一系列簇,即是GRGPF算法的初始簇。然后将T中具有相同祖先的簇聚合,表示书中的内部节点。
点的分配:对簇进行初始化之后,将每个点插入到距离最近的那个簇。
具体处理的细节更为复杂,如果对B-树比较了解,应该有帮助。
10. 流聚类,如何对最近m个点进行聚类?
N个点组成的滑动窗口模型,类似DGIM算法中统计1的个数。
1)首先,划分桶,桶的大小是2的次幂,每一级桶的个数最多是b个。
2)其次,对每个桶内的数据进行聚类,如采用层次聚类的方法。
3)当有新数据来临,需要新建桶,或者合并桶,这个类似于GDIM,但除了合并,还需要合并簇,当流内聚类的模型变化不是很快的时候,可以采取直接质心合并的方式。
4)查询应答:对最近的m个点进行聚类,当m不在桶的分界线上时,可以采用近似的方式求解,只需求出 包含m个点的最少桶 的结果
插入排序:
对于小型的排序任务速度很快,它是稳定的,只需要O(1)的额外空间,基于比较和交换的次数为O(n^2)。
#include <iostream>
#include <string>
#include <cstdio>
#include <cmath>
#include <vector>
#include <algorithm>
#include <sstream>
#include <cstdlib>
#include <fstream>
#include <queue>
using namespace std;
int x[8]={55,41,59,26,53,58,97,93};
int main()
{
//ifstream fin;
//fin.open("data1.txt");
for(int i=1;i<8;i++)
for(int j=i;j>0 && x[j]<x[j-1];j-- )
{
int t=x[j];
x[j]=x[j-1];
x[j-1]=t;
}
for(int i=0;i<8;i++)cout<<x[i]<<" ";
cout<<endl;
return 0;
}
快速排序:
如果n很大,快速排序的O(n*logn)的运行时间就很关键了,在结合随机划分和双向划分后,对于任意的n元输入数组,快排的期望运行时间正比于 n logn。
下面代码的版本是基于第一个元素进行划分,对于随机输入的数据这是没有问题的,但是对于某些常见输入,比如数组已基本有序,那么最坏情况下需要
O(n^2)的时间,这时候,我们可以才用随机划分元素的方法,可以改善性能,通过把第一个元素和后面所有元素中的一个随机项交换来实现这一点:
#include <iostream>
#include <string>
#include <cstdio>
#include <cmath>
#include <vector>
#include <algorithm>
#include <sstream>
#include <cstdlib>
#include <fstream>
#include <queue>
using namespace std;
int x[8]={55,41,59,26,53,58,97,93};
void qsort1(int l,int u)
{
if(l>=u)return;
int m=l;
for(int i=l+1;i<=u;i++)
{
if(x[i]<x[l]){
m++;
int t=x[m];
x[m]=x[i];
x[i]=t;
}
}
int t=x[l];
x[l]=x[m];
x[m]=t;
qsort1(l,m-1);
qsort1(m+1,u);
}
int main()
{
qsort1(0,7);
for(int i=0;i<8;i++)cout<<x[i]<<" ";
cout<<endl;
return 0;
}
顺便说一下:C库函数qsort非常简单相对较快,但是它比我们自己写的快排慢,仅仅是因为其通用而灵活的接口对每次比较都使用函数调用,C++库函数sort具有最简单的
接口:我们通过调用sort(x,x+n)来对数组x排序,实现也很高效。