jeff-zou/flink-connector-redis

Setting `command` option in TableDescriptor leads to Java error

Closed this issue · 6 comments

sink = (
        TableDescriptor.for_connector("redis")
        .option("host", "redis")
        .option("port", "6379")
        .option("database", "1")
        .option("redis-mode", "single")
        .option("command", "set")
        .schema(
        Schema.new_builder()
            .column("k_", DataTypes.STRING())
            .column("v_", DataTypes.STRING())
            .build())
        .build()
    )
    statement_set.add_insert(sink, table)
    statement_set.attach_as_datastream()

When I tried to do like this, I receive the following error:

Caused by: java.lang.UnsupportedOperationException
        at java.base/java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
        at org.apache.flink.streaming.connectors.redis.table.RedisDynamicTableFactory.createDynamicTableSink(RedisDynamicTableFactory.java:52)
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:319)
        ... 28 more

This looks like we cannot modify a Collection Map type. I traced back the error and seems the following block is the culprit:

if (context.getCatalogTable().getOptions().containsKey(REDIS_COMMAND)) {
            context.getCatalogTable()
                    .getOptions()
                    .put(
                            REDIS_COMMAND,
                            context.getCatalogTable()
                                    .getOptions()
                                    .get(REDIS_COMMAND)
                                    .toUpperCase());
        }

Please help me with this issue.

@jeff-zou any update on this issue so far?

I am sorry to keep you waiting, Please replace 'set' to 'SET' for command. Auto-uppercase is supported only by Table Api.

I am sorry to keep you waiting, Please replace 'set' to 'SET' for command. Auto-uppercase is supported only by Table Api.

I tried that and it does not work. It looks like the code tries to uppercase the command no matter the input. I tried to removed that block of code rebuilt the package but sadly it does not work.

Is it still UnsupportOperationException error?
Please provide the Trace Stack for error, and make sure provide ip for host
.option("host", ip)

I removed that block, recompiled in JAVA 8 and it seems to work. However, the command has to be passed in uppercase (for redis).