当前位置:首页 > 分享 > 正文

6Flink CEP模拟账号短时间内异地登录风控预警(flink client)

本篇文章我们来模拟一个真实的风险识别场景,模拟XX平台上可能出现盗号行为。

技术实现方案:

(1)通过将xxx平台用户登录时的登录日志发送到kafka(本文代码演示用的socket);

(2)Flink CEP SQL规则引擎中定义好风控识别规则,接入kafka数据源,比如一个账号在5分钟内,在多个不同地区有登录行为,那我们认为该账号被盗;

(3)Flink CEP将识别到的风险数据可以进行下发,为数据应用层提供数据服务,如:风控系统,数据大屏,态势感知.....

(1)我们先来定义一个数据生产者,模拟用户登录,产生登录日志:

package com.producers;import java.io.BufferedWriter;import java.io.IOException;import java.io.OutputStreamWriter;import java.net.ServerSocket;import java.net.Socket;import java.util.Random;/*** Created by lj on 2022-08-10.*/public class Socket_Producer1 {public static void main(String[] args) throws IOException {try {ServerSocket ss = new ServerSocket(9999);System.out.println(启动 server ....);Socket s = ss.accept();BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));String response = java,1,2;//每 2s 发送一次消息int i = 0;Random r=new Random();String[] userArr = {user1,user2,user3,user4,user5,user6,user7,user8,user9};String[] loginIP = {167.234.67.123,219.141.178.14,220.180.239.202,111.73.240.192,123.182.253.242};while(true){Thread.sleep(2000);response= userArr[r.nextInt(userArr.length)] + , + loginIP[r.nextInt(loginIP.length)] +\n;System.out.println(response);try{bw.write(response);bw.flush();i++;}catch (Exception ex){System.out.println(ex.getMessage());}}} catch (IOException | InterruptedException e) {e.printStackTrace();}}}

(2)在CEP中接入日志数据、定义风控规则

package com.examples;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.LocalDateTime;import static org.apache.flink.table.api.Expressions.$;/*** Created by lj on 2022-08-10.*/public class CEPSQLSocket1 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStreamSource<String> streamSource = env.socketTextStream(127.0.0.1, 9999,\n);SingleOutputStreamOperator<UserLoginLog> userLoginLog = streamSource.map(new MapFunction<String, UserLoginLog>() {@Overridepublic UserLoginLog map(String s) throws Exception {String[] split = s.split(,);return new UserLoginLog(split[0], split[1], LocalDateTime.now());}});// 将流转化为表Table table = tableEnv.fromDataStream(userLoginLog,$(username),$(ip),$(rowtime1),   //.rowtime()$(pt).proctime());CEP_SQL(env,tableEnv,table);env.execute();}private static void CEP_SQL(StreamExecutionEnvironment env,StreamTableEnvironment tEnv,Table table){System.out.println(===============CEP_SQL=================);tEnv.createTemporaryView(CEP_SQL, table);String sql = SELECT *  +FROM CEP_SQL  +MATCH_RECOGNIZE (  +PARTITION BY username  +ORDER BY pt  +          //在窗口内,对事件时间进行排序。MEASURES  +                   //定义如何根据匹配成功的输入事件构造输出事件e1.username as user1,+First(e1.ip) as first_ip, +LAST(e2.ip) as last_ip, +e1.rowtime1 as rt, +LAST(e2.pt) as end_tstamp  +           //最新的事件时间为end_timestampONE ROW PER MATCH  +                                      //匹配成功输出一条AFTER MATCH  skip to next row  +                   //匹配后跳转到下一行PATTERN ( e1 e2 ) WITHIN INTERVAL '5' MINUTE  +DEFINE  +                                                 //定义在PATTERN中出现的patternVariable的具体含义e1 AS  +e1.username <> '',  +e2 AS  +e1.username = e2.username AND e1.ip <> e2.ip  +) MR;TableResult res = tEnv.executeSql(sql);//        while (res.collect().hasNext()){//            Row next = res.collect().next();//            System.out.println(next);//        }res.print();tEnv.dropTemporaryView(CEP_SQL);}public static class UserLoginLog {public  String username;public  String ip;public LocalDateTime rowtime1;public UserLoginLog(){}public UserLoginLog(String username,String ip,LocalDateTime rowtime){this.username = username;this.ip = ip;this.rowtime1 = rowtime;}}}

(3)启动数据生产者,每2秒模拟一次用户登录行为

(4)启动CEP规则引擎服务,实时显示出现异地登录的用户信息: