woqutech/o2k

使用QDeoder + Flink实时检测信用卡欺诈交易

woqutech-qdecoder opened this issue · 0 comments

使用QDeoder + Flink实时检测信用卡欺诈交易

0. 背景

在当今数字时代,信用卡欺诈行为越来越被重视。 罪犯可以通过诈骗或者入侵安全级别较低系统来盗窃信用卡卡号。 用盗得的信用卡进行很小额度的例如一美元或者更小额度的消费进行测试。 如果测试消费成功,那么他们就会用这个信用卡进行大笔消费,来购买一些他们希望得到的,或者可以倒卖的财物。

本文将介绍如何建立一个针对可疑信用卡交易行为的反欺诈检测系统。通过本文你将了解到:

  • QDecoder 如何快速获取oracle数据库的交易数据。
  • Flink 如何为我们实现复杂业务逻辑并实时执行。

QDecoder是沃趣科技研发的、基于Oracle redo日志进行二进制反解析工具,能够及时将oracle数据库的变更解析出来,形成binlog;解析的结果以canal的protobuf的形式直接写入到kafka或者socket。关于QDecoder的更多介绍,请参考:https://hub.docker.com/r/woqutech/qdecoder

Apache Flink 提供了 DataStream API 来实现稳定可靠的、有状态的流处理应用程序。 Flink 支持对状态和时间的细粒度控制,以此来实现复杂的事件驱动数据处理系统。

下面将介绍基于QDecoder + Flink实现信用卡欺诈交易实时检测报警程序。

1. 架构

架构图

QDecoder实时获取Oracle的redo log,将感兴趣的数据解析出来,写入kafka。
Flink程序(FraudDetection)使用flink-connector-kafka实时从kafka获取交易数据,使用com.alibaba.otter.canal.protocol反序列化binlog,然后进行流式计算,识别出可能的欺诈交易,并输出警告。

使用QDecoder解析Oracle的redo log,具有以下优点:

  • QDecoder只须读取在线日志文件和归档日志文件,不会更改任何oracle的数据,对用户没有任何影响,非常安全可靠。
  • QDecoder对用户事务没有侵入性,无须更改应用逻辑,无须针对QDecoder做任何特殊的schema设计。
  • QDecoder能实时地解析redo log,保证下游程序能及时获得最新的交易数据。
  • QDecoder支持oracle的各种高级功能,保证解析出的数据完整一致。
  • QDecoder简单易用,几分钟就可以配置好。

2. 配置并启动QDecoder

2.1 在oracle数据库中准备交易表格:account

create user qdecoder identified by qdecoder default tablespace USERS quota unlimited on USERS;
grant connect,resource to qdecoder;
conn qdecoder/qdecoder;
create table account(accountid int primary key, balance number);

2.2 配置并启动QDecoder

运行docker命令,即可启动QDecoder:

docker run -it --name=qdecoder -p 9191:9191 -p 9092:9092 --pull always registry.cn-hangzhou.aliyuncs.com/woqutech/qdecoder

注意:如果docker版本比较低,不支持--pull always选项,则请手动更新docker image,保证运行qdecoder的最新版本:

docker pull registry.cn-hangzhou.aliyuncs.com/woqutech/qdecoder
docker run -it --name=qdecoder -p 9191:9191 -p 9092:9092 registry.cn-hangzhou.aliyuncs.com/woqutech/qdecoder

根据提示配置QDecoder,更多信息可参考 https://hub.docker.com/r/woqutech/qdecoder

以下配置需要特别注意一下:

  • 配置项1.1中列出的sql,请以dba权限在oracle中执行,这将配置QDecoder查询系统表需要的权限。
  • 配置项2.1: 输入将要检测的表:qdecoder.account
  • 配置项3.1: 选择输出到kafka, bootstrap.servers可以不输入,直接在容器中启动kafka。account表的变更将写入topic: defaultapp.qdecoder.binlog.qdecoder

配置如下:
配置示例

等QDecoder启动后,可以按照提示运行binlogdumpK,从kafka读取binlog并打印出来。

现在更新account表,看binlogdumpK的输出:

insert into account values(1,10000);
insert into account values(2,20000);
insert into account values(3,30000);
commit;

如果网络比较快,应该马上就能看到binlogdumpK输出了相应的binlog。

注意: binlogdumpK只是为了观察一下QDecoder的输出,你可以随时关掉它,这并不影响QDecoder和Flink程序的运行。

现在,QDecoder已经正常工作了,接下来,我们写一个Flink程序,读取binlog并检测欺诈交易。

3. 编写Flink程序进行欺诈检测

下面的Flink程序将检测account表的每一笔交易,若发现一个帐户在1分钟内,先出现了一笔小交易(小于1),后面又出现了一笔大交易(大于500),则认为出现了欺诈交易,立即输出警告。

完整代码在github: https://github.com/woqutech/qdecoder/tree/main/FlinkSample/frauddetection

3.1 下载代码

git clone https://github.com/woqutech/qdecoder.git
cd qdecoder/FlinkSample/frauddetection

3.2 运行程序

frauddetection是一个maven创建的项目,有pom.xml项目文件,可以导入各种IDE,进行调试和运行。

3.2.1 用intellij IDEA运行frauddetection程序

打开项目

开始界面:open or import -> 选择frauddetection目录
或者
菜单: file/open -> 选择frauddetection目录

运行程序

菜单: run -> run 'FraudDetectionJob'

注意:如果报告slf4j重复,且有大量的log输出,请在module/dependencies中删除ch.qos.logback:logback-classic和ch.qos.logback:logback-core。

菜单: file -> Project Structure -> Modules -> dependencies, 删除上述依赖。

3.3 更新account.balance,模拟交易,观察Flink程序的输出

update account set balance = balance - 0.1 where accountid = 1;
commit;
update account set balance = balance - 0.2 where accountid = 1;
commit;
update account set balance = balance + 100 where accountid = 2;
commit;
update account set balance = balance - 501 where accountid = 1;
commit;
update account set balance = balance - 200 where accountid = 2;
commit;

在一分钟内,account-1先出现了小于1的变更,后面又出现了大于500的变更,则识别为欺诈事务。
执行完上述SQL,Flink程序会立即输出:

21:11:20,107 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=1}

表示accountid=1的帐号检测到欺诈交易。

3.4 代码解析

Flink程序(FraudDetection)共有三个类,位于src/main/java/spendreport:

  • FraudDetectionJob: Flink程序主控类,main方法中组装并运行了Flink程序。
  • BinlogTransactionSchema: 反序列化QDecoder输出的binlog,生成Transaction对象。
  • FraudDetector: 进行流式计算,检测欺诈。

程序的基本框架来自 https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/try-flink/datastream/,所不同的是,这里使用flink-connector-kafka从kafka读取QDecoder写入的binlog,然后用com.alibaba.otter.canal.protocol反序列化并组装成Transaction对象,最后交给FraudDetector进行处理,识别欺诈交易。

由于QDecoder输出的binlog采用了和阿里巴巴的canal兼容的格式,所以可以直接使用canal.protocol包反序列化之。

使用外部组件的方法也很简单,在pom.xml中加入依赖即可:

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.11</artifactId>
			<version>1.14.3</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba.otter</groupId>
			<artifactId>canal.protocol</artifactId>
			<version>1.1.3</version>
		</dependency>

3.4.1 FraudDetectionJob.main:

main方法中组装Flink程序:

  • 创建Flink流执行环境:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 创建FlinkKafkaConsumer,为Flink程序提供数据流:
FlinkKafkaConsumer<Transaction> kafkaSource = new FlinkKafkaConsumer<Transaction>("defaultapp.qdecoder.binlog.qdecoder", new BinlogTransactionSchema(), properties);
  • 将kafka数据源加入执行环境:
DataStream<Transaction> transactions = env
				.addSource(kafkaSource)
				.name("transactions");
  • 为数据流增加处理功能:用FraudDetector分析每一笔交易,按accountid分区,并行计算
DataStream<Alert> alerts = transactions
			.keyBy(Transaction::getAccountId)
			.process(new FraudDetector())
			.name("fraud-detector");
  • 最后,启动Flink程序
env.execute("Fraud Detection");

完整代码如下:

public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 创建FlinkKafkaConsumer,用BinlogTransactionSchema反序列化
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
		properties.setProperty("group.id", "flink.test");
		
		FlinkKafkaConsumer<Transaction> kafkaSource = new FlinkKafkaConsumer<Transaction>("defaultapp.qdecoder.binlog.qdecoder", new BinlogTransactionSchema(), properties);
		kafkaSource.setStartFromEarliest();

		DataStream<Transaction> transactions = env
				.addSource(kafkaSource)
				.name("transactions");

		DataStream<Alert> alerts = transactions
			.keyBy(Transaction::getAccountId)
			.process(new FraudDetector())
			.name("fraud-detector");

		alerts
			.addSink(new AlertSink())
			.name("send-alerts");

		env.execute("Fraud Detection");
	}

3.4.2 BinlogTransactionSchema.deserialize:

使用com.alibaba.otter.canal.protocol反序列化binlog, 计算balance的变化,生成org.apache.flink.walkthrough.common.entity.Transaction对象。

主要代码如下:

// 反序列化Entry
CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(binlog);
// 获取表名
entry.getHeader().getTableName();
// 获取Entry type: ROWDATA|TRANSACTIONBEGIN|TRANSACTIONEND|...
entry.getEntryType();
// 获取row change
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
// 获取Event type: INSERT|UPDATE|DELTE|...
rowChange.getEventType();
// 获取执行时间
long executeTimeMs = entry.getHeader().getExecuteTime();
// 获取row data
CanalEntry.RowData rowData = rowChange.getRowDatas(0);

// 获取旧值
for (CanalEntry.Column col : rowData.getBeforeColumnsList()) {
    if (col.getName().equalsIgnoreCase("accountid")) {
        oldRow.accountId = Long.parseLong(col.getValue());
    } else if (col.getName().equalsIgnoreCase("balance")) {
        oldRow.balance = Double.parseDouble(col.getValue());
    }
}

// 获取新值
for (CanalEntry.Column col : rowData.getAfterColumnsList()) {
    if (col.getName().equalsIgnoreCase("accountid")) {
        newRow.accountId = Long.parseLong(col.getValue());
    } else if (col.getName().equalsIgnoreCase("balance")) {
        newRow.balance = Double.parseDouble(col.getValue());
    }
}

// 创建transaction
new Transaction(newRow.accountId, executeTimeMs, Math.abs(newRow.balance-oldRow.balance));

3.4.3 FraudDetector.processElement

处理每一个Transaction对象,识别同一帐号可能存在的欺诈交易。更多细节请参考https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/try-flink/datastream/

4. 总结

以上示例展示了QDecoder在Flink流式计算中的基本用法:

使用QDecoder实时地获取oracle的变更,并将变更写入kafka;然后由Flink从kafka获取交易详情,进行实时计算,及时输出计算结果。

对于拥有大量oracle交易数据库的实时交易系统来讲,使用QDecoder快速获取数据变更,为后端的大数据计算提供了高效、稳定的数据流。
QDecoder简单易用的特性,以及对数据库和应用都无侵入的设计,非常易于搭建基于oracle数据源的大数据分析系统。