pythonhadoop教程_PythonAPI操作Hadoophdfs详解1:安装
由于是windows环境(linux其实也⼀样),只要有pip或者setup_install安装起来都是很⽅便的pip install hdfs
2:Client——创建集连接from hdfs import *
其他参数说明:classhdfs.client.Client(url, root=None, proxy=None, timeout=None, session=None)
url:ip:端⼝
root:制定的hdfs根⽬录
proxy:制定登陆的⽤户⾝份
timeout:设置的超时时间
client.list(“/”)
[u’home’,u’input’, u’output’, u’tmp’]
3:dir——查看⽀持的⽅法dir(client)
4:status——获取路径的具体信息
其他参数:status(hdfs_path, strict=True)
hdfs_path:就是hdfs路径
strict:设置为True时,如果hdfs_path路径不存在就会抛出异常,如果设置为False,如果路径为不存在,则返回Nonepython怎么读取dat文件
5:list——获取指定路径的⼦⽬录信息client.list(“/”)
[u’home’,u’input’, u’output’, u’tmp’]
其他参数:list(hdfs_path, status=False)
status:为True时,也返回⼦⽬录的状态信息,默认为Flase
6:makedirs——创建⽬录client.makedirs(“/123”)
其他参数:makedirs(hdfs_path, permission=None)
permission:设置权限client.makedirs(“/test”,permission=777)
7: rename—重命名ame(“/123″,”/test”)
8:delete—删除client.delete(“/test”)
其他参数:delete(hdfs_path, recursive=False)
recursive:删除⽂件和其⼦⽬录,设置为False如果不存在,则会抛出异常,默认为False
9:upload——上传数据client.upload(“/test”,”F:\[PPT]Google Protocol Buffers.pdf”);
其他参数:upload(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None,
chunk_size=65536,progress=None, cleanup=True, **kwargs)
overwrite:是否是覆盖性上传⽂件
n_threads:启动的线程数⽬
temp_dir:当overwrite=true时,远程⽂件⼀旦存在,则会在上传完之后进⾏交换
chunk_size:⽂件上传的⼤⼩区间
progress:回调函数来跟踪进度,为每⼀chunk_size字节。它将传递两个参数,⽂件上传的路径和传输的字节数。⼀旦完成,-1将作为第⼆个参数
cleanup:如果在上传任何⽂件时发⽣错误,则删除该⽂件
10:download——下载client.download(“/”,”/home”)
11:read——读取⽂件ad(“/test/[PPT]Google Protocol Buffers.pdf”) as reader:
ad()
其他参数:
read(*args, **kwds)
hdfs_path:hdfs路径
offset:设置开始的字节位置
length:读取的长度(字节为单位)
buffer_size:⽤于传输数据的字节的缓冲区的⼤⼩。默认值设置在HDFS配置。
encoding:制定编码
chunk_size:如果设置为正数,上下⽂管理器将返回⼀个发⽣器产⽣的每⼀chunk_size字节⽽不是⼀个类似⽂件的对象
delimiter:如果设置,上下⽂管理器将返回⼀个发⽣器产⽣每次遇到分隔符。此参数要求指定的编码。
progress:回调函数来跟踪进度,为每⼀chunk_size字节(不可⽤,如果块⼤⼩不是指定)。它将传递两个参数,⽂件上传的路径和传输的字节数。称为⼀次与- 1作为第⼆个参数。
问题:
1.hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE, inode=”/test”:root:supergroup:drwxr-xr-x
解决办法是:在配置⽂件l中加⼊
-inputformat
-file
-D
Map.py:#!/usr/local/bin/python
import sys
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
if s.strip()!= "":
print "%s\t%s"% (s, 1)
Reduce.py:#!/usr/local/bin/python
import sys
current_word = None
count_pool = []
sum = 0
for line in sys.stdin:
word, val = line.strip().split('\t')
if current_word== None:
current_word = word
if current_word!= word:
for count in count_pool:
sum += count
print "%s\t%s"% (current_word, sum)
current_word = word
count_pool = []
sum = 0
count_pool.append(int(val))
for count in count_pool:
sum += count
print "%s\t%s"% (current_word, str(sum))Run.sh:
HADOOP_CMD="/data/hadoop-2.7.0/bin/hadoop"
STREAM_JAR_PATH="/data/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar" INPUT_FILE_PATH_1="/The_Man_"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr-skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar$STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper"python map.py" \
-reducer "pythonred.py" \
-file ./map.py \
-file ./red.py
⽬的:通过python模拟mr,计算每年的最⾼⽓温。
1. 查看数据⽂件,需要截取年份和⽓温,⽣成key-value对。[tianyc@TeletekHbase python]$ cat test.9999999N9+ 9999999N9+ 0500001N9+
<0500001N9+
2. 编写map,打印key-value对[tianyc@TeletekHbase python]$ cat map.py
import re
import sys
for line in sys.stdin:
val=line.strip()
(year,temp)=(val[15:19],val[40:45])
print "%s\t%s" % (year,temp)
[tianyc@TeletekHbase python]$ cat test.dat|python map.py
1950 +0000
1950 +0022
1950 -0011
1949 +0111
1949 +0078
3. 将结果排序[tianyc@TeletekHbase python]$ cat test.dat|python map.py |sort
1949 +0078
1949 +0111
1950 +0000
1950 -0011
1950 +0022
4. 编写redurce,对map中间结果进⾏处理,⽣成最终结果[tianyc@TeletekHbase python]$ cat red.py import sys
(last_key,max_val)=(None,0)
for line in sys.stdin:
(key,val)=line.strip().split('\t')
if last_key and last_key!=key:
print '%s\t%s' % (last_key, max_val)
(last_key, max_val)=(key,int(val))
else:
(last_key, max_val)=(key,max(max_val,int(val)))
if last_key:
print '%s\t%s' % (last_key, max_val)
5. 执⾏。[tianyc@TeletekHbase python]$ cat test.dat|python map.py |sort|python red.py
1949 111
1950 22
使⽤python语⾔进⾏MapReduce程序开发主要分为两个步骤,⼀是编写程序,⼆是⽤Hadoop Streaming命令提交任务。
还是以词频统计为例
⼀、程序开发
1、Mapperfor line in sys.stdin:
filelds = line.strip.split(' ')
for item in fileds:
print item+' '+'1'
2、Reducerimport sys
result={}
for line in sys.stdin:
kvs = line.strip().split(' ')
k = kvs[0]
v = kvs[1]
if k in result:
result[k]+=1
else:
result[k] = 1
for k,v in result.items():
print k+' '+v
....
写完发现其实只⽤map就可以处理了…reduce只⽤cat就好了
3、运⾏脚本
1)Streaming简介
Hadoop的MapReduce和HDFS均采⽤Java进⾏实现,默认提供Java编程接⼝,⽤户通过这些编程接⼝,可以定义map、reduce函数等等。
但是如果希望使⽤其他语⾔编写map、reduce函数怎么办呢?
Hadoop提供了⼀个框架Streaming,Streaming的原理是⽤Java实现⼀个包装⽤户程序的MapReduce程序,该程序负责调⽤hadoop提供的Java编程接⼝。
2)运⾏命令/.../bin/hadoop streaming
-input /..../input
-output /..../output
-mapper "mapper.py"
-reducer "reducer.py"
-file mapper.py

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。