Amazon EMR 云计算服务编程实践
EMR是Amazon亚马逊云计算平台提供的一项服务,用户可以在此平台使用亚马逊强大的计算资源执行Map Reduce程序。由于Map Reduce很多情况下都是做的海量数据文本统计类的并行计算任务,需要耗费很多时间,使用云计算则可以大大加快执行速度。
EMR建立在Amazon S3和Amazon EC2的 基础上。用户提交一个map程序和一个reduce程序,同时提交需要处理的数据文件作为输入。这些都上传到Amazon S3云端存储平台,在EMR中指定相应的S3路径,就可以开始做数据处理了。EMR会根据用户指定的规模配置,开启一个EC2集群,在每个节点上运行 Hadoop。运行结束后用户可以从S3获取结果数据。下面以word count单词统计任务为例,介绍具体操作过程。
1. 登录Amazon AWS控制台,选择新建任务(点击查看大图)
2. 在运行Map Reduce任务前,我们需要先将Map和Reduce程序,以及待分析的输入文件上传到S3中
3. 回到新建任务上来,输入任务信息
4. 程序位置、输入文件位置等信息,这些文件都预先上传到了S3里
Mapper和Reducer程序,请参考Word Count。
5. 配置EC2实例相关参数
6. 此步可跳过
7. 所有任务信息汇总确认
8. 监控Map Reduce任务运行状态
9. 任务执行完毕
10. 从S3中查看或下载输出结果
Amazon的云计算平台提供了很多API,使得开发者能根据自己的需要与云端交互。boto(Amazon官方介绍)就是一个python API,作为一个脚本语言,使用python可以以简短的代码写出原型程序,快速实现或检验自己的想法。下面是使用boto与Amazon EMR交互的例子。
#!/usr/bin/python # # Amazon EMR Interface # A program to run and monitor streaming job flow on Amazon EMR import time connected = 0 jobid =0 def connect(): access_key = raw_input('Your access key:').strip() secret_key = raw_input('Your secret key:').strip() from boto.emr.connection import EmrConnection global conn conn = EmrConnection(access_key, secret_key) global connected connected = 1 def run(): if connected == 0: print 'Not connected!' elif connected == 1: s_mapper = raw_input('Path of the mapper:').strip() s_reducer = raw_input('Path of the reducer:').strip() s_input = raw_input('Path of input files:').strip() s_output = raw_input('Path for storing output files:').strip() from boto.emr.step import StreamingStep step = StreamingStep(name='My steps', mapper=s_mapper, reducer=s_reducer, input=s_input, output=s_output) jf_name = raw_input('Name of job flow:').strip() jf_log = raw_input('Path of logs:').strip() instance_type = raw_input('Type of instance:').strip() instance_num = raw_input('Number of instance:').strip() global jobid jobid = conn.run_jobflow(name=jf_name,log_uri=jf_log, steps=[step], slave_instance_type=instance_type,num_instances=int(instance_num)) print 'jobid = '+ jobid while True: time.sleep(5) status = conn.describe_jobflow(jobid) print status.state if 'ENDED' in status.state: break def showMenu(): title = ''' Amazon EMR Service connect Connect to Amazon EMR run Input job flow info and run on Amazon EMR quit Quit Enter choice:''' while True: choice = raw_input(title).strip().lower() choices = ['connect','run','quit'] if choice not in choices: print('Input Error!') else: if choice == 'quit': break elif choice == 'connect': connect() elif choice == 'run': run() if __name__ == '__main__': showMenu() |