maven方式:
<dependency>
<groupId>com.qiniu</groupId>
<artifactId>transform-plugin-sdk</artifactId>
<version>1.0.1</version>
<scope>provided</scope>
</dependency>
或者sbt方式:
"com.qiniu" %% "transform-plugin-sdk" % 1.0.1 % "provided"
或者gradle方式:
configurations {
provided
}
sourceSets {
main { compileClasspath += configurations.provided }
}
dependencies {
provided 'com.qiniu:transform-plugin-sdk:1.0.1'
}
- 自定义
plugin需要继承JavaParser抽象类, JavaParser example。 - 重写
JavaParser中parse方法,parse方法支持单行到单行或者多行间的转换,函数返回值类型为List<Row>。 - 注意: parse的返回值
List<Row>中每行数据由TransformSpec中plugin的output所有字段所对应的值组成,且这些字段的组织顺序必须与TransformSpec中plugin的output中字段顺序保持一致,例如:
TransformSpec中plugin如下, 其中com.qiniu.AppendTwoFieldsJavaParser功能为返回2个字段:
"plugin": {
"name": "com.qiniu.AppendTwoFieldsJavaParser",
"output": [
{
"name":"Field1",
"type":"string"
},{
"name":"Field2",
"type":"string"
}
]
}
那么parse 返回值parseResult(类型为List<Row>)为:
// 1.根据plugin中output的字段顺序Field1和Field2,因此每行输出数据为Field1和Field2相应值Value1和Value2组成的List
List<Object> resultRow = new ArrayList<Object>();
resultRow.add(Value1);
resultRow.add(Value2);
// 2.由于本parse方法是单行到单行间的转换,最终返回的 `parseResult`的size为1,即只包含resultRow
List<Row> parseResult = new ArrayList<Row>();
parseResult.add(new GenericRow(resultRow.toArray()));,上面parse是单行到单行间的转换,因此输出的parseResult中仅包含一行数据,该行数据需要按一定顺序组织output中所有字段。如果对于是单行到N行转换的parse,则输出的parseResult中包含N行数据,其中每行数据需要按一定顺序组织output中所有字段。
- 自定义
plugin需要继承ScalaParser抽象类,ScalaParser example。 - 重写
ScalaParser中parse方法,parse方法支持单行到单行或者多行间的转换,函数返回值类型为Seq[Row]。 - 注意: parse的返回值
Seq[Row>中每行数据由TransformSpec中plugin的output所有字段所对应的值组成,且这些字段的组织顺序必须与TransformSpec中plugin的output中字段顺序保持一致,例如:
TransformSpec中plugin如下, 其中com.qiniu.AppendTwoFieldsScalaParser功能为返回2个字段:
"plugin": {
"name": "com.qiniu.AppendTwoFieldsScalaParser",
"output": [
{
"name":"Field1",
"type":"string"
},{
"name":"Field2",
"type":"string"
}
]
}
那么parse 返回值parseResult(类型为Seq[Row])为:
// 1.根据plugin中output的字段顺序Field1和Field2,因此每行输出数据为Field1和Field2相应值Value1和Value2组成的Seq
val resultRow = Seq(Value1, Value2)
// 2.由于本parse方法是单行到单行间的转换,最终返回的 `parseResult`的size为1,即只包含resultRow
val parseResult = Seq(Row.fromSeq(resultRow)),上面parse是单行到单行间的转换,因此输出的parseResult中仅包含一行数据,该行数据需要按一定顺序组织output中所有字段。如果对于是单行到N行转换的parse,则输出的parseResult中包含N行数据,其中每行数据需要按一定顺序组织output中所有字段。
- 编写Parser类注意事项,假设现需要用Java和Scala分别编写
UserDefinedJavaParser和UserDefinedScalaParser两种plugin:
Java
class UserDefinedJavaParser extends JavaParser{
// 1. UserDefinedJavaParser需要提供如下构造函数
public UserDefinedJavaParser(Integer order, List<String> pluginFields, StructType schema, Map<String, Serializable> configuration) {
super(order, pluginFields, schema, configuration);
}
// 2. 根据Java语言编写plugin的说明实现parse方法
public List<Row> parse(Row originData) {
}
}Scala
// 1. 与java类似,需要提供如下构造函数
class UserDefinedScalaParser (order: Integer,
pluginFields: Seq[String],
schema: StructType,
configuration: Map[String, JSerializable]) extends ScalaParser(order, pluginFields, schema, configuration) {
// 2. 根据Scala语言编写plugin的说明实现parse方法
override def parse(originData: Row): Seq[Row] = {
}
}
- 获取原始行数据中某个key对应的value:
Java
// 1.首先获取key在原始行数据中的index
int index = getFieldIndex(key);
// 2.原始每行输入数据
List<Object> newRow = new ArrayList<Object>(Arrays.asList(((GenericRow)originData).values()));
// 3.获取key对应的Value值, 其类型Type为源repo中该key的类型,注意当key为date类型时候,类型Type为Long类型。
Type value = (Type)newRow.get(index);Scala
// 1.首先获取key在原始行数据中的index
val index:Int = getFieldIndex(key)
// 2.原始每行输入数据
val orignalRow: Seq[Any] = originData.toSeq
// 3.获取key对应的Value, 其类型Type为源repo中该key的类型,注意当key为date类型时候,类型Type为Long类型。
val value: Type = newRow.get(index).asInstanceOf[Type];- 获取
TransformSpec中plugin的output所有字段:
Java
List<String> pluginOutputs = getPluginFields();Scala
// 注意JList为java中List接口,非Scala中List
val pluginOutputs: JList[String] = getPluginFields