博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
在AWS上自动执行Hadoop计算
阅读量:6257 次
发布时间:2019-06-22

本文共 4251 字,大约阅读时间需要 14 分钟。

Hadoop框架为大数据项目提供了许多有用的工具。但是自己管理它太复杂了。几个月前,我正在使用Cloudera部署Hadoop集群。我发现它仅适用于计算和存储容量不变的架构。将Cloudera这样的工具用于需要扩展的系统是一场噩梦。这就是云技术的用武之地,让我们的生活更轻松。Amazon Web Services(AWS)是此用例的最佳选择。AWS为Hadoop提供了一个名为Elastic Map Reduce(EMR)的托管解决方案。EMR允许开发人员快速启动Hadoop集群,做必要的计算,并终止他们的时候所有的工作完成。为了进一步自动化此过程,AWS为EMR服务提供了SDK。使用它,您可以使用单个命令启动Hadoop任务。我将在下面的示例中展示它是如何完成的。

我将在EMR中的Hadoop集群上执行Spark作业。我的目标是计算amazon.com上大型客户评论数据集的每个星级评分(1-5)的平均评论长度。通常,要执行Hadoop计算,我们需要将所有数据存储在HDFS中。但EMR与S3集成,我们不需要启动数据实例并 为了两分钟的计算而复制大量数据。这种与S3的兼容性是使用EMR的一大优势。许多数据集都是使用S3 分发的,包括我在这个例子中使用的数据集(你可以在这里找到它)。

最初,您应手动启动EMR集群 (使用控制台),以便让AWS为集群映像创建必要的安全组(它们将是我们自动执行脚本所必需的)。为此,请转到EMR服务页面,单击“创建群集”,然后启动具有默认设置的群集。之后,终止它,您将为主实例和从属实例创建两个默认安全组。您还应该创建一个S3存储桶来存储Spark作业执行的结果。

整个自动化解决方案包含两个Python文件。第一个是Spark作业本身(将在集群上执行)。第二个是启动脚本,它将调用EMR并将Spark作业传递给它。该脚本将被执行 在您的机器上本地。您应该安装boto3 Python库以使用AWS SDK。

job.py文件的内容:

导入 系统

进口 pyspark

sc = pyspark。SparkContext(appName = “评论”)

def to_entity(item):

words = item。拆分('\ t')尝试:    rating = int(words [ 7 ])    评论 = 单词 [ 13 ]    返回(评级,len(评论))除了 ValueError:    返回(无,无)

def avg_sec(a,b):return(a [ 0 ] + b,a [ 1 ] + 1)

def avg_comb(a,b):return(a [ 0 ] + b [ 0 ],a [ 1 ] + b [ 1 ])
def avg_eval(a):返回 a [ 0 ] / a [ 1 ]

fileName = 'amazon_reviews_us_Camera_v1_00.tsv.gz'

dirName = 's3:// amazon-reviews-pds / tsv /'
rdd = sc。textFile(dirName + fileName)

outFile = sys。argv [ 1 ]

#过滤跳过标题

评论 = rdd。map(to_entity)。过滤器(拉姆达 X:X [ 0 ] 是 不 无)。坚持()

INIT =(0,0)

结果 = 评论。aggregateByKey(init,avg_sec,avg_comb)。mapValues(avg_eval)
结果。saveAsTextFile(outFile)
launcher.py文件的内容:

导入 boto3

进口 时间
import argparse

parser = argparse。ArgumentParser(description = '在AWS EMR上启动Spark作业')

解析器。add_argument('aws_access_key',metavar = 'ACCESS_KEY',help = 'AWS Access Key')

解析器。add_argument('aws_secret_key',metavar = 'SECRET_KEY',help = 'AWS Secret Key')
解析器。add_argument('aws_region',metavar = 'REGION',help = 'AWS Region')
解析器。add_argument('bucket',metavar = 'BUCKET',help = 'S3 Bucket')
解析器。add_argument('job_file',metavar = 'JOB_FILE',help = 'Spark Job file')
解析器。add_argument('result_folder',metavar = 'RESULT_FOLDER',help = '结果的S3文件夹')
解析器。add_argument('cluster_name',metavar = 'CLUSTER_NAME',help = 'EMR Cluster Name')
解析器。add_argument('key_name',metavar = 'SSH_KEY_NAME',help = 'SSH Key Name')
解析器。add_argument('master_sg',metavar = 'MASTER_SG',help = '主实例组的安全组ID')
解析器。add_argument('slave_sg',metavar = 'SLAVE_SG',help = '从属实例组的安全组ID')

args = 解析器。parse_args()

client = boto3。客户(

'emr',aws_access_key_id = args。aws_access_key,aws_secret_access_key = args。aws_secret_key,region_name = args。aws_region

s3Client = boto3。客户(

's3',aws_access_key_id = args。aws_access_key,aws_secret_access_key = args。aws_secret_key,region_name = args。aws_region

邮票 = STR(INT(时间。时间()))

s3JobFileName = 'job_' + stamp + ' .py '
s3ResultFolderName = args。result_folder + '_' + 戳

s3Client。upload_file(ARGS。job_file,ARGS。桶,s3JobFileName)

响应 = 客户端。run_job_flow(

Name = args。cluster_name,LogUri = 's3:// aws-logs-511622038217-eu-central-1 / elasticmapreduce /',ReleaseLabel = 'emr-5.17.0',Instances = {    ' MasterInstanceType':'m4.large',    'SlaveInstanceType':'m4.large',    'InstanceCount':5,    'Ec2KeyName':args。key_name,    'KeepJobFlowAliveWhenNoSteps' :虚假,    'TerminationProtected':错误,    'HadoopVersion':'2.8.4',    'EmrManagedMasterSecurityGroup':args。master_sg,    'EmrManagedSlaveSecurityGroup':args。slave_sg},步骤 = [    {        '名字':'星火工作',        'ActionOnFailure':'继续',        'HadoopJarStep':{            'Jar':'command-runner.jar',            'Args':[                'spark-submit',                '--deploy-mode',                '集群',                's3://'  +  args。bucket  +  '/'  +  s3JobFileName,                's3://'  +  args。bucket  +  '/'  +  s3ResultFolderName            ]        }    },]应用 = [    {        '名字':'火花'    },]VisibleToAllUsers = True,JobFlowRole = 'EMR_EC2_DefaultRole',ServiceRole = 'EMR_DefaultRole',ScaleDownBehavior = 'TERMINATE_AT_TASK_COMPLETION',EbsRootVolumeSize = 32

打印(“响应:” + str(响应))

由于launcher.py需要许多参数,因此通过包含此命令的模板shell脚本更容易调用它:

python3 launcher.py \

<AWS_KEY_ID> \
<AWS_SECRET> \
<REGION> \
<S3存储桶(已创建)> \
<Spark作业文件(本地)> \
<输出S3 foler名称> \
<CLUSTER_NAME> \
<EC2_SSH_KEY_NAME> \
<SECURITY_GROUP_ID_FOR_MASTER_INSTANCE(已创建)> \
<SECURITY_GROUP_ID_FOR_SLAVE_INSTANCES(已创建)>

转载地址:http://gxxsa.baihongyu.com/

你可能感兴趣的文章
Jackson使用ObjectManage#readValue传入泛型T的问题
查看>>
Python正则表达式中的re.S的作用
查看>>
从零开始构建一个centos+jdk7+tomcat7的docker镜像文件
查看>>
Source Insight 中文注释为乱码解决办法(完美解决,一键搞定)
查看>>
【LoadRunner】安装LoadRunner
查看>>
Linux内存管理 (15)页面迁移
查看>>
在高并发、高负载的情况下,如何给表添加字段并设置DEFAULT值?
查看>>
Cocos2d-x 3.0final 终结者系列教程13-贪食蛇游戏案例(全)
查看>>
Nginx的try_files指令和命名location使用实例
查看>>
IO多路复用之select
查看>>
pd_ds中的hash
查看>>
买书不读是一种什么病?
查看>>
微信接口开发报错invalid credential, access_token is invalid or not latest hint
查看>>
nohup 部署springboot 使用命令
查看>>
MQ产品比较-ActiveMQ-RocketMQ
查看>>
暂时没有想好呢。
查看>>
windows服务 MVC之@Html.Raw()用法 文件流的读写 简单工厂和工厂模式对比
查看>>
PHP解析URL并得到URL中的参数
查看>>
【vue.js】绑定click事件
查看>>
字体属性
查看>>