请教group windows的order by问题
caigenxing opened this issue · 1 comments
需要根据1秒的window中的ROWTIME进行排序,但是报如下错误:
Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlanInternal(StreamExecSort.scala:118)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlanInternal(StreamExecSort.scala:59)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSort.translateToPlan(StreamExecSort.scala:59)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:91)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
at com.dtstack.flink.sql.exec.FlinkSQLExec.sqlUpdate(FlinkSQLExec.java:94)
at com.dtstack.flink.sql.exec.ExecuteProcessHelper.sqlTranslation(ExecuteProcessHelper.java:235)
at com.dtstack.flink.sql.exec.ExecuteProcessHelper.getStreamExecution(ExecuteProcessHelper.java:169)
at com.dtstack.flink.sql.Main.main(Main.java:41)
at com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:140)
我的脚本如下:
CREATE TABLE MyTable(
name varchar,
after varchar,
jstime bigint,
WATERMARK FOR jstime AS withOffset(jstime,1000)
)WITH(
type ='kafka10',
bootstrapServers ='pro1:9092',
kafka.auto.offset.reset ='latest',
topic ='test1',
parallelism ='1',
sourcedatatype ='json'
);
CREATE TABLE result_user_info(
name varchar,
after varchar,
jstime bigint,
ROWTIME datetime
)WITH(
type ='console',
parallelism ='1'
);
insert into result_user_info(name,after,jstime,ROWTIME) (select name ,
after ,
jstime,ROWTIME from MyTable
group by name,after,jstime,TUMBLE(ROWTIME, INTERVAL '1' SECOND),ROWTIME order by ROWTIME)
kafka消息:
{"name":"testname","jstime":3119988231,"after":"{"khh":"我是khh0003"}"}
想请教是否是语法写错了,还是有别的排序方法,多谢
Flink要开启EventTime。你试下