帮酷LOGO
0 0 评论
  • 显示原文与译文双语对照的内容
文章标签:Hadoop  
Ruby libraries for efficient, effective Hadoop streaming

  • 源代码名称:wukong
  • 源代码网址:http://www.github.com/infochimps-labs/wukong
  • wukong源代码文档
  • wukong源代码下载
  • Git URL:
    git://www.github.com/infochimps-labs/wukong.git
  • Git Clone代码到本地:
    git clone http://www.github.com/infochimps-labs/wukong
  • Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/infochimps-labs/wukong
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
  • Wukong

    Wukong是一个快速。灵活的数据应用程序开发工具包。

    在Wukong中的核心概念是处理器。 Wukong处理器是简单的ruby 类,做一件事情并做好。 代码库实现处理器和其他核心Wukong类,并提供了在命令行上运行和组合处理器。

    大量的wukong主题是的黑盒子,漂亮的胶水。 Wukong生态系统由其他工具组成,这些工具在各种不同的后端中运行Wukong处理器。 用Wukong编写的代码可以很容易地在环境和框架之间移植: 本地命令行脚本会立即变成在Hadoop中运行的强大的任务。

    下面是一些它的他项目的列表,你可以在尝试了解完整的Wukong体验时阅读该项目:

    更全面的视角也请参见Infochimps平台 Community Edition ( FIXME: 将所有的工具组合到一起,使之适合开发人员的肩膀,这将把所有的工具结合起来。

    编写简单的处理器

    在Wukong中计算的基本单元是处理器。 处理器是 ruby 类

    • 子类 Wukong::Processor ( 这里方法使用 Wukong.processor 方法作为 sugar )
    • 定义一个 process 方法,它接受输入记录,执行某些操作,并在输出中调用 yield

    下面是一个反转它的输入记录的处理器:

    # in string_reverser.rbWukong.processor(:string_reverser) dodefprocessstringyield string.reverse
     endend

    可以使用文本文件在 命令行 上运行这里处理器,使用Wukong附带的wu-local 工具作为输入:

    $ cat novel.txt
    It was the best of times, it was the worst of times.
    ...
    $ cat novel.txt | wu-local string_reverser.rb
    .semit fo tsrow eht saw ti, semit fo tseb eht saw tI

    wu-local 程序从标准标准中时间消耗一行,并将处理器方法的process 调用为 ruby 字符串对象。 在你的流程方法中,你 yield的每个对象都将在 STDOUT。

    多个处理器,多个( 或者不) 输出

    处理器将被组合使用,以便它们可以存储在与这两个相关处理器相同的文件中:

    # in processors.rbWukong.processor(:splitter) dodefprocessline line.split.each { |token| yield token }
     endendWukong.processor(:normalizer) dodefprocesstoken stripped = token.downcase.gsub(/W/,'')
     yield stripped if stripped.size >0endend

    注意 splitter 如何为它的每个输入标记生成多个标记,并且 normalizer 有时可能永远不会屈服,这取决于它的输入。 处理器不受框架的任何责任来产生或者返回任何内容,这样它们就可以轻松地充当过滤器。

    文件中有两个处理器,并且不共享 NAME 与文件("处理器")的basename,因这里 wu-local 无法自动选择处理器。 我们可以使用 --run 选项显式指定一个:

    $ cat novel.txt | wu-local processors.rb --run=splitter
    It
    was
    the
    best
    of
    times,
    ...

    我们可以把这两个处理器组合在一起。

    $ cat novel.txt | wu-local processors.rb --run=splitter | wu-local processors.rb --run=normalizer
    it
    was
    the
    best
    of
    times
    ...

    但是有一个简单的方法可以用 dataflows来实现这一点。

    添加可以配置选项

    处理器可以在命令行。配置文件或者其他各种地方( 感谢 Configliere ) 中设置选项,这些选项可以在命令行中设置。

    这个处理器根据观测值假定给定的平均值和标准偏差,从观测中。 它使用两个非标准字段平均或者平均分布( mean ) 及其标准偏差( std_dev )。 从这个信息中,它将测量所有输入值的百分比。

    # in percentile.rbWukong.processor(:percentile) doSQRT_1_HALF=Math.sqrt(0.5)
     field :mean, Float, :default => 0.0 field :std_dev, Float, :default => 1.0defprocessvalue observation = value.to_f
     z_score = (mean - observation) / std_dev
     percentile =50*Math.erfc(z_score *SQRT_1_HALF)
     yield [observation, percentile].join("t")
     endend

    这些字段有默认值,但是你可以将它们限制在 命令行 上。 if考试成绩为 80分数,分数为 10,分数为,则你的分数为:

    $ echo 95 | wu-local/tmp/percentile.rb --mean=80 --std_dev=10
    95.0 93.3192798731142

    如果考试比较困难,平均为 75点,标准偏差为 8点,那么你将在 99th %的分数 !

    $ echo 95 | wu-local/tmp/percentile.rb --mean=75 --std_dev=8
    95.0 99.37903346742239

    处理器的生命周期

    处理器有一个生命周期,它们在 Wukong runner的上下文中运行时,像 wu-local 或者 wu-hadoop 一样运行。 每个生命周期阶段对应于调用的处理器的方法:

    • 在处理完处理器之后,setup 被调用,但是在开始之前,它被初始化。 这种方法你不能屈服。
    • process 为每个输入记录调用一次,可能产生一次,多次或者无一次。
    • 当最后一次记录被处理,但是处理器仍然有机会生成记录时,调用了 finalize
    • stop 调用向处理器发送所有工作应该停止,打开连接应该关闭,& c。 这种方法你不能屈服。

    上面的例子已经集中于 process 方法。

    通常将 setupstop 方法一起用于处理外部连接

    # in geolocator.rbWukong.processor(:geolocator) do field :host, String, :default => 'localhost'attr_accessor:connectiondefsetupself.connection =Database::Connection.new(host)
     enddefprocessrecord record.added_value = connection.find("...some query...")
     enddefstopself.connection.close
     endend

    在编写包含存储或者聚合信息的"缩小"-type操作之前,finalize 方法最有用,直到满足某些条件为止。 在最后一个记录被给定( 到 process ) 之后,它将被调用,但是你可以在你自己的代码中调用它。

    下面是使用 finalize 方法实现计数所有输入记录的简单计数器的示例:

    # in counter.rbWukong.processor(:counter) doattr_accessor:countdefsetupself.count =0enddefprocessthingself.count +=1enddeffinalizeyield count
     endend

    这取决于最后一个输入记录将被传递给 process ,并且只有随后才会调用。 这允许最后一个输入记录是 counted/processed/aggregated,然后在最后处理整个聚合。

    因为这个重点在于构建和处理聚合,所以 finalize 方法在处理器中通常用作Hadoop环境。

    注意:在每个可能的环境中,Finalize都不保证被调用,因为它取决于所选的runner。 在本地或者Hadoop环境中,"最后记录"的概念有意义,因此相应的跑步者将调用 finalize。 在风暴的环境中,最后一个记录的概念不是( 应该是) 意义上的,相应的runner 并没有调用它。

    命令行序列化

    wu-local ( 还有很多类似的工具) 以字符串形式处理输入和输出。

    处理器希望尽可能接近它的域来处理对象。 用 Twitter 句柄装饰地址簿条目的处理器不想将它的输入视为字符串,而是散列或者更好。

    Wukong使处理器与它的他处理器一起容易地解析记录的常见任务,并将它的转换为 ruby 模型。

    序列化数据格式,如JSON或者 TSV

    Wukong可以解析和发出像JSON和CSV等常见的数据格式,使你不会对自己的处理器进行污染。

    下面是一个想处理哈希作为输入的处理器的例子。

    # in extractor.rbWukong.processor(:extractor) dodefprocesshshyield hsh["first_name"]
     endend

    给定JSON数据

    $ cat input.json
    {"first_name":"John","last_name":,"Smith"}
    {"first_name":"Sally","last_name":,"Johnson"}
    ...

    你可以直接将它馈送给处理器

    $ cat input.json | wu-local --from=json extractor.rb
    John
    Sally
    ...

    其它处理器真的喜欢数组:

    # in summer.rbWukong.processor(:summer) dodefprocessvaluesyield values.map(&:to_f).inject(&:+)
     endend

    所以你可以给他们TSV数据

    $ cat data.tsv
    1 2 3
    4 5 6
    7 8 9
    ...
    $ cat data.tsv | wu-local --from=tsv summer.rb
    6
    15
    24
    ...

    但是,你可以很容易地使用CSV数据相同的代码

    $ cat data.tsv | wu-local --from=csv summer.rb

    或者更一般的分隔格式。

    $ cat data.tsv | wu-local --from=delimited --delimiter='--' summer.rb
    Recordizing数据结构到域模型

    下面是一个依赖于人员模型来决定是否应该生成联系人条目的联系人验证程序:

    # in contact_validator.rbrequire'person'Wukong.processor(:contact_validator) dodefprocesspersonyield person if person.valid?
     endend

    依赖( 定义的其他位置) 人模型定义 valid? 意味着处理器可以保持苗条和可读。 Wukong可以将输入文本与上面的反序列化功能结合,将输入文本转换为 Person:

    $ cat input.json | wu-local --consumes=Person --from=json contact_validator.rb
    #<Person:0x000000020e6120>
    #<Person:0x000000020e6120>
    #<Person:0x000000020e6120>

    wu-local 还可以序列化 contact_validator 处理器中的记录:

    $ cat input.json | wu-local --consumes=Person --from=json contact_validator.rb --to=json
    {"first_name":"John","last_name":,"Smith","valid":"true"}
    {"first_name":"Sally","last_name":,"Johnson","valid":"true"}
    ...

    序列化格式与反序列化格式类似,使用JSON和带分隔符的格式。

    解析记录到模型实例并再次序列化它们将对提供这些实例的模型类进行约束。 下面是 Person 类所需的外观:

    # in person.rbclassPerson# Create a new Person from the given attributes. Supports usage of# the `--consumes` flag on the command-line## @param [Hash] attrs# @return [Person]defself.receiveattrsnew(attrs)
     end# Turn this Person into a basic data structure. Supports the usage# of the `--to` flag on the command-line.## @return [Hash]defto_wire to_hash
     endend

    为了支持 --consumes=Person 语法,receive 类方法必须采用由 --from 参数的操作生成的散列,并返回 Person 实例。

    为了支持 --to=json 语法,Person 类必须实现 to_wire 实例方法。

    日志记录和通知

    Wukong提供了一个记录器,所有处理器都可以通过 log 属性访问它。 这里记录器具有以下优先级:

    • debug ( 可以设置为日志级别)
    • 信息( 可以设置为日志级别)
    • 警告( 可以设置为日志级别)
    • 错误
    • 致命

    这里是一个使用它们的处理器

    # in logs.rbWukong.processor(:logs) dodefprocessline log.debug line
     log.info line
     log.warn line
     log.error line
     log.fatal line
     endend

    默认日志级别是调试。

    $ echo something | wu-local logs.rb
    DEBUG 2013-01-11 23:40:56 [Logs ] -- something
    INFO 2013-01-11 23:40:56 [Logs ] -- something
    WARN 2013-01-11 23:40:56 [Logs ] -- something
    ERROR 2013-01-11 23:40:56 [Logs ] -- something
    FATAL 2013-01-11 23:40:56 [Logs ] -- something

    虽然你可以将它设置为全局

    $ echo something | wu-local logs.rb --log.level=warn
    WARN 2013-01-11 23:40:56 [Logs ] -- something
    ERROR 2013-01-11 23:40:56 [Logs ] -- something
    FATAL 2013-01-11 23:40:56 [Logs ] -- something

    或者按每类的基础。

    创建文档

    wu-local 包含一个帮助消息:

    $ wu-local --help
    usage: wu-local [ --param=val | --param | -p val | -p ] PROCESSOR|FLOW
    wu-local is a tool for running Wukong processors and flows locally on
    the command-line. Use wu-local by passing it a processor and feeding
    ...
    Params:
     -r, --run=String Name of the processor or dataflow to use. Defaults to basename of the given path.

    你可以为自己的处理器生成自定义帮助消息。 以前是以前的百分比处理器,但更适合于好文档:

    # in percentile.rbWukong.processor(:percentile) do description <<-EOF.gsub(/^ {2}/,'') This processor calculates percentiles from input scores based on a given mean score and a given standard deviation for the scores. The mean and standard deviation are given at run time and processed scores will be compared against the given mean and standard deviation. The input is expected to consist of float values, one per line. Example: $ cat input.dat 88 89 77. . . $ cat input.dat | wu-local percentile.rb --mean=85 --std_dev=7 88.0 66.58824291023753 89.0 71.61454169013237 77.0 12.654895447355777 EOFSQRT_1_HALF=Math.sqrt(0.5)
     field :mean, Float, :default => 0.0, :doc => "The mean of the assumed distribution" field :std_dev, Float, :default => 1.0, :doc => "The standard deviation of the assumed distribution"defprocessvalue observation = value.to_f
     z_score = (mean - observation) / std_dev
     percentile =50*Math.erfc(z_score *SQRT_1_HALF)
     yield [observation, percentile].join("t")
     endend

    除了原始 --help 参数之外,如果你将 wu-local 调用到这里处理器,那么你将得到自定义文档。

    $ wu-local percentile.rb --help
    usage: wu-local [ --param=val | --param | -p val | -p ] PROCESSOR|FLOW
    This processor calculates percentiles from input scores based on a
    given mean score and a given standard deviation for the scores.
    ...
    Params:
     --mean=Float The mean of the assumed distribution [Default: 0.0]
     -r, --run=String Name of the processor or dataflow to use. Defaults to basename of the given path.
     --std_dev=Float The standard deviation of the assumed distribution [Default: 1.0]

    将处理器合并到 Dataflows

    Wukong提供了一个将处理器组合到dataflows中的DSL。 这种DSL旨在使复制尝试和使用简单工具构建简单工具,然后结合它们来创建更复杂的流程。

    例如,编写 tokenizer 处理器后,我们可以在数据流中使用它,以及在上一个示例中复制的regexp 处理器。

    # in find_t_words.rbrequire_relative('processors')Wukong.dataflow(:find_t_words) do tokenizer | regexp(match:/^t/)end

    | 运算符将一个处理器( 什么是 yield s )的输出与另一个( 它的process 方法)的输入连接。 在这个例子中,tokenizer 发出的每个记录都将被 regexp 处理。

    你可以直接运行这个数据流( mimicing我们在命令行上把单处理器连接在一起):

    $ cat novel.txt | wu-local find_t_words.rb
    the
    times
    the
    times
    ...

    更复杂的数据流拓扑

    将处理器放在线性管道中相比,Wukong数据流DSL允许更复杂的拓扑。

    上面例子中用于将两个处理器连接到一个链中的| 操作符也可以用来连接单个处理器,在数据流中创建一个分支点。 流的每个分支将接收相同的记录。

    可以用于执行具有相同记录的多个操作,如下面的示例所示:

    # in book_reviews.rbWukong.dataflow(:complicated) do from_json | recordize(model:BookReview) | 
     [
     map(&:author) | do_author_stuff |.. . | to_json,
     map(&:book) | do_book_stuff |.. . | to_json,
     ]end

    通过 recordize 处理器生成的每个 BookReview 记录都将传递给流的后续分支,每个分支都执行不同类型的处理。 运行时,两个分支( 这里是 to_json 首先)的输出记录将在最后输出中被穿插。

    select 这样的处理器可以用于将它的输入过滤,用于将流分割为两种类型的记录:

    # in complicated.rbWukong.dataflow(:complicated) do from_json | parser | 
     [
     select(&:valid?) | further_processing |.. . | to_json,
     select(&:invalid?) | track_errors | null
     ]end

    在这里,只有响应 true的记录将通过第一个流( 正在应用 further_processing 等),而只记录响应 true的记录将通过第二个流( 使用 track_errors ) 来传递。 第二个分支末尾的null 处理器确保最终输出中只发出来自第一个分支的记录。

    可以重新分割流,在处理输入源时,可以使用丰富的语义:

    # in many_splits.rbWukong.dataflow(:many_splits) do from_json | parser | recordize(model:BookReview) | [ 
     map(&:author) |.. . | to_json,
     map(&:publisher) | [
     select(&:domestic?) |.. . | to_json,
     select(&:international?) | 
     [
     select(&:north_american?) |.. . | 
     [
     select(&:american?) |.. . | to_json,
     select(&:canadian?) |.. . | to_json,
     select(&:mexican?) |.. . | to_json,
     ],
     select(&:asian?) |.. . | to_json,
     select(&:european?) |.. . | to_json,
     ],
     ],
     map(&:title) |.. . | to_json
     ]end

    <一个 name="序列化>"

    命令行序列化

    处理器的进程方法必须接受字符串参数并生成字符串参数( 或者是能适当地)。

    定义 consumesemits 自动处理序列化和反序列化的能力。

    <一个 name="小部件>

    小部件

    Wukong有许多内置部件,对于搭建你的dataflows或者作为你自己的处理器的起点,你可以使用它。

    对于这些小部件,你可以得到定制的帮助,比如

    $ wu-local group --help

    序列化程序

    序列化器是不改变记录语义意义的小部件,只是它的表达。 下面是一个列表:

    • to_jsonfrom_json,用于将记录转换为JSON或者将JSON解析为记录
    • to_tsvfrom_tsv,用于将 array 记录转换为ipqos或者将它的解析为 array 记录
    • 用于打印JSON输入的pretty

    在编写能够独立运行的处理器时,需要使用命令行上的序列化/反序列化选项 --to--from 确保在命令行中反序列化和序列化记录,如所示。

    对于只在数据流中运行的处理器,你可以通过在最开始和结束时不执行任何( e ) 序列化来优化。

    Wukong.dataflow(:complicated) do from_json | proc_1 | proc_2 | proc_3.. . proc_n | to_jsonend

    这种方法不会在处理器之间进行序列化,只在开始和结束时。

    ( 这实际上是序列化选项本身的实现,它们动态地附加/附加适当的反序列化/序列化程序)

    通用用途

    有几个通用处理器实现输入和输出数据的通用模式。 这些在数据流定义的上下文中是最有用的。

    • null 做你认为它没有
    • map 在每个模块上执行块
    • flatten 使输入数组平坦
    • filterselectreject 仅允许某些记录基于一个块
    • regexpnot_regexp 仅通过匹配( 或者不匹配)的记录正则表达式
    • limit 只允许一些记录通过
    • logger 将事件发送到本地日志流
    • extract 提取每个输入事件的某些部分

    其中一些小部件可以直接使用,也可以使用一些参数

    Wukong.processor(:log_everything) do proc_1 | proc_2 |.. . | loggerendWukong.processor(:log_everything_important) do proc_1 | proc_2 |.. . | regexp(match:/important/i) | loggerend

    其他部件需要一个块来定义它的操作:

    Wukong.processor(:log_everything_important) do parser | select { |record| record.priority =~/important/i } | loggerend

    减速器

    有一些小部件可以进行聚合运算,如计数。排序和求和。

    • count 发出所有输入记录的最终计数
    • sort 可以排序输入流
    • group 通过一些提取部分对记录进行分组,并给出每组大小的计数
    • moments 将发出更复杂的统计数据( 即,std。 在给定的一些值的基础上进行测试

    下面是在 命令行 上对数据进行排序的示例

    $ head tokens.txt | wu-local sort
    abhor
    abide
    abide
    able
    able
    able
    about
    ...

    尝试添加组:

    $ head tokens.txt | wu-local sort | wu-local group
    {:group=>"abhor", :count=>1}
    {:group=>"abide", :count=>2}
    {:group=>"able", :count=>3}
    {:group=>"about", :count=>3}
    {:group=>"上面", :count=>1}
    ...

    你还可以在更复杂的数据流中使用这些数据:

    Wukong.dataflow(:word_count) do tokenize | remove_stopwords | sort | groupend

    命令

    Wukong附带了几个内置命令。

    local

    你已经在上面的许多示例中看到过,wu-local。 使用 STDINSTDOUT 在本地对dataflows进行建模和输出,并使用 wu-local

    wu-localwu-storm 等更复杂的命令中,是一个"核心层"Wukong命令,由Wukong插件实现,最终调用一些 wu-local 过程。

    Wukong还附带了另一个基本命令 wu-source。 除了从 STDIN 读取任何输入外,这里命令与 wu-local 非常类似。 相反,它在一个易于配置的周期性的方式中生成自己的输入记录。 因此,它作为UNIX管道中其他进程的数据源,作为数据源的。

    下面是使用 identity 处理器的示例,它将对接收到的准确输入进行打印效果:

    $ wu-source identity
    1
    2
    3
    ...

    从这个例子很明显,wu-source 产生的记录是从 1开始的连续整数,它们是以每秒一条记录的速率生成的。

    因此 wu-source 可以用于将任何处理器( 数据流) 转换为数据源:

    # in random_numbers.rbWukong.processor(:random_numbers) dodefprocessindexyield rand() * index.to_i
     endend

    像这样运行 random_numbers:

    $ wu-source random_numbers.rb
    0.7671364694830113
    0.5958089791553307
    1.8284806932633886
    3.707189931235327
    4.106618048255548
    ...

    它产生的随机数是 GREATER的。

    你还可以完全忽略处理器中 wu-source的输入记录:

    # in generator.rbWukong.processor(:generator) dodefprocess_yield new_record
     enddefnew_recordMyRecord.new(...)
     endend

    它可以产生 MyRecord 实例,因为它是由 wu-source 驱动的。

    这种方法很容易每秒使用 wu-source 生成数千事件:

    $ wu-source generator.rb --per_sec=2000

    或者使用 --period ( 它是 --per_sec的逆) 在常规间隔( 这里示例中的每 5分钟) 中输出记录:

    $ wu-source generator.rb --period=300

    wu-source 可以与你可能编写的其他dataflows或者程序组合使用:

    $ wu-source generator.rb --per_sec=200 | wu-local my_flow

    使用 wu 命令的上下文中使用任何其他 wu- 命令时,命令是方便命令。

    而不是键入

    $ bundle exec wu-local my_flow --option=value.. .

    使用 wu-local 命令在项目和 Gemfile.lock 中声明的wukong ( 以及任何其他依赖关系),可以运行,这个命令可以让你

    $ wu local my_flow --option=value.. .

    本质上将 bundle exec 前缀和 munging wu-local 添加到 wu-local 中。 这在使用Wukong做很多工作时很有帮助。

    注意:如果项目在你的项目中工作,但 wu whatever 失败,可能是因为Bundler解析了 wu- 命令,而某些安装不是在你的$PATH ( 通常情况下,如果你运行 bundle install --standalone ) 中的某些安装。 确保 wukong gem 安装在系统上,并且它的二进制文件是你使用 wu 命令的$PATH

    测试

    Wukong附带了几个帮助器,可以使用 RSpec来编写写规范。

    在处理器中你需要测试的唯一方法是 process 方法。 处理器和功能的其他方法由Wukong提供,已经测试过了。

    你可能需要用两种方法测试这里过程方法:

    • 类本身在各种上下文中的单元测试
    • 使用 wu-local ( 或者其他) 命令行 runner 运行类的集成测试

    数据驱动的单元测试

    让我们从简单的处理器开始

    # in tokenizer.rbWukong.processor(:tokenizer) dodefprocesstext text.downcase.gsub(/[^sw]/,'').split.each do |token|
     yield token
     endendend

    你可以直接测试这里处理器:

    # in spec/tokenizer_spec.rbrequire'spec_helper'describe :tokenizerdo subject { Wukong::Processor::Tokenizer.new }
     before { subject.setup }
     after { subject.finalize ; subject.stop }
     it "correctly counts tokens"do expect { |b| subject.process("Hi there, Wukong!", &b) }.to yield_successive_args('hi', 'there', 'wukong')
     endend

    但是必须自己处理块的收益率可能导致冗长和不可读的测试。 Wukong为这个案例定义了一些助手。 需要并在你的spec_helper.rb 中首先包含它们:

    # spec/spec_helper.rbrequire'wukong'require'wukong/spec_helpers'RSpec.configure do |config|
     config.include(Wukong::SpecHelpers)end

    然后在你的测试中使用

    # in spec/tokenizer_spec.rbrequire'spec_helper'describe :tokenizerdo it_behaves_like 'a processor', :named => :tokenizer it "emits the correct number of tokens"do processor.given("Hi there.nMy name is Wukong!").should emit(6).records
     end it "eliminates all punctuation"do processor(:tokenizer).given("Never!").should emit('Never')
     end it "will not emit tokens in a stop list"do processor(:tokenizer, :stop_list => ['apples', 'bananas']).given("I like apples and bananas").should emit('I', 'like', 'and')
     endend

    让我们看看各种 helper:

    • a processor 共享示例( 使用 rspec helper的it_behaves_like 调用) 添加一些测试,确保处理器符合 Wukong::Processor的API。

    • processor 方法实际上是更恰当命名的( 但不方便) unit_test_runner的别名。 这里方法接受处理器 NAME 和选项( 就像 wu-local 和其他命令行工具一样),并返回 Wukong::UnitTestRunner 实例。 这里 runner 处理

      ( 已经注册) 处理器 NAME 和选项,并创建一个新处理器。 如果未给出 NAME,则使用封闭 describe 或者 context 块的参数。 由 processor 返回的对象是你正在测试的Wukong::Processor,因此你可以直接对它的进行 introspect,或者声明对它的行为的期望。

    • given 方法( 和其他助手如 given_jsongiven_tsv。& c。) 是 runner 上的一种方法。 它是一种延迟向处理器提供记录的方法,不必直接执行 process 方法,也必须处理处理器。

    • 当调用 outputemit 匹配器时,它们将对所有以前的given 记录进行 process。 这让你可以分离实例化。输入。期望和输出。 下面是一个更复杂的例子。

    同样的助手也可以用来测试dataflows以及处理器。

    函数 vs-对象

    上述测试助手旨在帮助测试处理器的功能,因为:

    • 他们接受

    集成测试

    如果你正在实现一个新的Wukong命令( 类似于 wu-local ),那么你还可能希望运行集成测试。 Wukong也附带了帮助器。

    你几乎总是能够在没有集成测试的情况下测试你的处理器。 无论部署的环境如何,你的单元测试和Wukong框架本身都应该确保你的处理器正常工作。

    # spec/integration/tokenizer_spec.rbcontext "running the tokenizer with wu-local"do subject { command("wu-local tokenizer") <"hi there" }
     it { should exit_with(0) }
     it { should have_stdout("hi", "there") }endcontext "interpreting its arguments"do context "with a valid --match argument"do subject { command("wu-local tokenizer --match='^hi'") <"hi there" }
     it { should exit_with(0) }
     it { should have_stdout("hi") }
     it { should_not have_stdout("there") }
     end context "with a malformed --match argument"do# invalid b/c the regexp is broken... subject { command("wu-local tokenizer --match='^(h'") <"hi there" }
     it { should exit_with(:non_zero) }
     it { should have_stderr(/invalid/) }
     endend

    让我们通过助手:

    • command helper 围绕将启动的命令行创建包装器。 命令和工作目录的环境将从 ENVDir.pwd的当前值获取,除非

      • in 或者 using 参数与 command 链接以指定工作目录和环境:
      command("some-command with --args").in("/my/working/directory").using("THIS" => "ENV_HASH", "WILL_BE" => "MERGED_OVER_EXISTING_ENV")
      • 调用 command helper的范围定义了方法 integration_cwdintegration_env。 这可以通过在 spec_helper.rb 中包括一个 MODULE 来完成:
      # in spec/support/integration_helper.rbmoduleIntegrationHelperdefintegration_cwd"/my/working/directory"enddefintegration_env { "THIS" => "ENV_HASH", "WILL_BE" => "MERGED_OVER_EXISTING_ENV" }
       endend# in spec/spec_helper.rbrequire_relative("support/integration_helper")RSpec.configure do |config|
       config.include(IntegrationHelper)end
    • command helper 可以接受带有 < 方法的输入。 输入可以是字符串,也可以是字符串的array。 它将通过STDIN传递给命令。

    • have_stdouthave_stderr 允许你为特定字符串或者 正规表达式 测试命令的STDOUT或者 STDERR。

    • matcher允许你测试命令的退出代码。 你可以通过符号 :non_zero 来设定期望的期望,任何非零退出码。

    插件插件

    Wukong有一个内置的插件框架,可以容易地将Wukong处理器适应新的后端或者添加它的他。 它所支持的Wukong::Local MODULE 和 wu-local 程序本身就是一个 Wukong。

    下面展示了如何将 Wukong::Local的简化版本构建为新插件。 我们将调用这个插件 Cat,因为它将实现一个与 wu-local ( 简化了) 类似的程序 wu-cat

    首先要做的是在代码中包含 Wukong::Plugin MODULE:

    # in lib/cat.rb## This Wukong plugin works like wu-local but replicates some silly# features of cat like numbered lines.moduleCat# This registers Cat as a Wukong plugin.includeWukong::Plugin# Defines any settings specific to Cat. Cat doesn't need to, but# you can define global settings here if you want. You can also# check the `program` name to decide whether to apply your settings.# This helps you not pollute other commands with your stuff.defself.configuresettings, programcase program
     when'wu-cat' settings.define(:input, :description => "The input file to use")
     settings.define(:number, :description => "Prepend each input record with a consecutive number", :type => :boolean)
     else# configure other programs if you need toendend# Lets Cat boot up with settings that have already been resolved# from the command-line or other sources like config files or remote# servers added by other plugins.## The `root` directory in which the program is executing is also# provided.defself.bootsettings, root puts "Cat booting up using resolved settings within directory #{root}"endend

    如果插件不直接与命令行( 像 wu-local 或者 wu-hadoop 这样的wu工具) 交互,并且不直接接口传递到处理器,那么就需要完成。

    编写一个与命令行交互的runner

    如果你需要实现一个新的命令行 工具,那么你应该编写一个 runner。 一个 runner 用来实现Wukong程序,如 wu-local 或者 wu-hadoop。 下面是我们的插件插件示例 wu-cat的实际程序文件的内容。

    #!/usr/bin/env ruby# in bin/wu-catrequire'cat'Cat::Runner.run

    Cat::Runner 类是单独实现的。

    # in lib/cat/runner.rbrequire_relative('driver')moduleCat# Implements the `wu-cat` command.classRunner <Wukong::Runner usage "PROCESSOR|FLOW" description <<-EOF wu-cat lets you run a Wukong processor or dataflow on the command-line. Try it like this. $ wu-cat --input=data.txt hello my friend Connect the output to a processor in upcaser.rb $ wu-cat --input=data.txt upcaser.rb HELLO MY FRIEND You can also include add line numbers to the output. $ wu-cat --number --input=data.txt upcaser.rb 1 HELLO 2 MY 3 FRIEND EOF# The name of the processor we're going to run. The #args method# is provided by the Runner class.defprocessor_name args.first
     end# Validate that we were given the name of a registered processor# to run. Be careful to return true here or validation will fail.defvalidateraiseWukong::Error.new("Must provide a processor as the first argument") unless processor_name
     trueend# Delgates to a driver class to run the processor.defrunDriver.new(processor_name, settings).start
     endendend

    编写驱动程序与处理器交互

    Cat::Runner#run 方法委托给 Cat::Driver 类来处理实例化和与处理器交互。

    # in lib/cat/driver.rbmoduleCat# A class for driving a processor from `wu-cat`.classDriver# Lets us count the records.attr_accessor:number# Gives methods to construct and interact with dataflows.includeWukong::DriverMethods# Create a new Driver for a dataflow with the given `label` using# the given `settings`.## @param [String] label the name of the dataflow# @param [Configliere::Param] settings the settings to use when creating the dataflowdefinitializelabel, settingsself.settings = settings
     self.dataflow = construct_dataflow(label, settings)
     self.number =1end# The file handle of the input file.## @return [File]definput_file@input_file||=File.new(settings[:input])
     end# Starts feeding records to the processordefstartwhile line = input_file.readline rescuenil driver.send_through_dataflow(line)
     endend# Process each record that comes back from the dataflow.## @param [Object] record the yielded recorddefprocessrecordif settings[:number]
     puts [number, record].map(&:to_s).join("t")
     else puts record.to_s
     endself.number +=1endendend


    文章标签:Hadoop  

    Copyright © 2011 HelpLib All rights reserved.    知识分享协议 京ICP备05059198号-3  |  如果智培  |  酷兔英语