白名单实现分享——ES、bitmap

平台—新白名单

一、背景

平台,基于白名单制,原有流程:

  1. 运营后台导入用户pin列表,把这一批用户分配给产品(一个或多个);
  2. 现有pin列表,根据id范围分配给产品(一个或多个);

原有白名单分配流程,操作较简单,但由于用户质量参差不齐,运营同学无法精准的通过增减白名单,来控制产品的进件人数,同时,用户在风控侧有大量的风控数据,根据用户风险的变化情况,来给产品增减白名单,从而达到精准运营和风险控制。

在这样的背景下,新白名单的基本诉求:

  1. 根据风控指标筛选,并支持按风控规则比例分配名单
  2. 支持实时进行本产品或不同产品间,进行用户的排斥筛选
  3. 能按照指定的方式,全量或部分删减已经配置的白名单列表

目前这些功能已经全部实现。

二、核心流程

流程图

三、基础知识

3.1 ES索引结构

流程图

3.2 ES数据展示

流程图

3.3 ES与mysql对比

流程图

3.4 倒序索引

流程图

3.5 采用ES的原因

  1. 大数据量存储(集群、分片、备份片)
  2. 类似sql的查询,支持各种条件筛选(=、!=、in、is null、like、not like)(sum、max、min)
  3. 检索速度快(倒序索引)
  4. 其他:大数据量深度分页(scroll)、批量处理(bulk)

3.6 Redis-bitmap展示

流程图

3.7 Redis-bitmap本质

  1. redis中最基本的数据结构:key-value
  2. value对应的是二进制数组
  3. redis提供丰富api:setbit、getbit、bitcount、bitpos

3.8 采用bitmap原因

  1. 占用空间较少(目前1.7亿用户,大约占用30G+,即一千万占用约2G)
  2. 查询速度快(redis基于内存,查询速度ms级,适用于借钱平台大数据量访问)

3.9 使用场景(适用于结果非0即1的场景)

  1. 白名单场景
  2. 用户签到统计(用户为key,日期为value数组)
  3. 日活用户统计(日期为key,用户数字id为value数组)
  4. …(大家补充)

四、业务相关

4.1 为什么需要同步?

利用”有则更新,无则插入”规则,不同于mysql的是,ES基于的是id,且整条记录替换,而mysql是根据唯一索引,可指定具体字段
我们(风控)就是根据使用mysql的经验,以为可以支持部分字段(风控标签字段)的更新,最后线上验证时发现问题,风控推送更新数据,覆盖掉了我们的业务字段(产品列表、用户序号等),后引入“同步方案”。

4.2 为什么需要先插入“用户_产品维度的ES索引”,后更新操作?

提前告诉答案:保证增加(减少)白名单,要么全部成功,要么全部失败。具体如何保证,下文介绍,此处介绍如果不使用这个中间索引的后果:

1
2
3
4
5
6
1. 顺带的作用:如果R2M出现问题,用户白名单数据都丢失了,可以根据该索引数据,进行恢复
2. 增加白名单场景:[直接从用户标签ES中获取用户,直接反更新“产品列表”字段、redis位图]
----2.1 产品1风控规则:A等级用户 80%;B等级用户 20%
----2.2 某次给产品1添加白名单,总数1000人,其中A等级800人,B等级200人
----2.3 刚给操作100人,异常了。此时,成功100人都是等级A。
----2.4 相当于此后,该产品的风控用户比例不满足8:2【风控规则,为强制要求】

4.3 为什么ES中“产品列表”字段需要加“|”?

流程图

1
2
3
4
5
6
7
8
9
> 如果有产品编码:ccb,wfccb
> =--------------
> 张三 “ccb,wfccb”
> 李四 “ccb”
> 王五 “wfccb”
> =--------------
> select * from table where assignproduct not like "%cbb%";
> 预期结果:王五
> 执行结果:[空]

4.4 一次添加白名单的全流程

流程图

4.5 白名单现状

  • 产品历史白名单清洗完成后,总用户数在1.7亿左右,产品白名单在18亿左右。
  • 运营同学已经开始使用新功能进行白名单的增减操作,上周五完成3700万的名单添加(今天预计会增加2000万)
  • R2M空间占用,也由原来的80G左右,缩减到30G
  • 借钱平台,用户白名单列表查询已经切到新的R2M查询(TP99 基本在3ms以内,TP99基本在1.5ms左右)

4.6 待改进项

  • 增减白名单的效率问题(线上环境,一小时500万,但是多个产品操作时,会有所下降,300-400万/小时)
  • 主要原因:
    (1)ES集群处理瓶颈(目前数据放在JES的一个公共集群,资源较有限)
    (2)需要保证数据处理结果的严谨性,部分环节无法发挥异步或并发处理的优势(后续可以优化流程)

五、代码展示

5.1 ES

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
package com.jd.jr.dubhe.ctrl.center.task.show;
import com.jd.jr.dubhe.ctrl.center.task.BaseTest;
import com.jd.jr.dubhe.ctrl.center.task.api.enums.DubheUserRiskLabelFieldEnum;
import com.jd.jr.dubhe.ctrl.center.task.api.enums.ProcessFlagEnum;
import com.jd.jr.dubhe.ctrl.center.task.api.enums.UserStatusEnum;
import com.jd.jr.jes.client.EsClientFactory;
import com.wangyin.commons.util.Logger;
import com.wangyin.commons.util.LoggerFactory;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.junit.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* @Describe: ES相关操作
* JES-wiki:http://wiki.cbpmgt.com/confluence/pages/viewpage.action?pageId=20589182
* @Author: liuke
* @Date: 2018/12/19 12:38
*/
public class ESTest extends BaseTest {
public static final Logger logger = LoggerFactory.getLogger(ESTest.class);
public static final String INDEX = "dubhe_user_risk_label";
public static final String TYPE = "dubhe";
public static final String URL = "http://172.24.5.78:8083";
public static final String APPNAME = "dubhe_ctrlcenter_task";
public static final String SECRET = "122ac7fa73f2e8cae7b4e9c68ba9908d";
public static final String PREFIX_PIN = "show_pin_12121_";
public static final String PRODUCT_CODE_PROTECT = "|";
public static Client client;
static {
//获取客户端
EsClientFactory esClientFactory = new EsClientFactory(URL, APPNAME, SECRET);
try {
client = esClientFactory.createClient();
} catch (Exception e) {
logger.info("异常:", e);
}
}
/**
* insert
* <p>
* select * from dubhe_user_risk_label where userid = "show_pin_12121_0001"
*/
@Test
public void test1() throws Exception {
XContentBuilder builder = jsonBuilder()
.startObject()
.field("accounttype", "JDPIN")
.field("usergrade", "B")
.field("userid", PREFIX_PIN + "0001")
.endObject();
String json = builder.toString();
System.out.println("============insert:" + json);
//插入数据,指定id:PREFIX_PIN+"0001"
//注意:如果id=1的记录存在,则更新【主键的有则更新,无则插入】【整个文档覆盖】
IndexResponse response = client.prepareIndex(INDEX, TYPE, PREFIX_PIN + "0001")
.setSource(builder)
.get();
}
/**
* 根据id查询
*/
@Test
public void test2() {
GetResponse response = client.prepareGet(INDEX, TYPE, PREFIX_PIN + "0001").get();
System.out.println("============query_result:" + response.getSourceAsString());
}
/**
* 更新
* select * from dubhe_user_risk_label where userid = "show_pin_12121_0001"
*/
@Test
public void test7() throws Exception {
UpdateRequest updateRequest = new UpdateRequest(INDEX, TYPE, PREFIX_PIN + "0001")
.doc(jsonBuilder()
.startObject()
.field("usergrade", "A")
.endObject());
client.update(updateRequest).get();
}
/**
* 删除
*/
@Test
public void test8() {
DeleteResponse response = client.prepareDelete(INDEX, TYPE, PREFIX_PIN + "0001").get();
}
/**
* 多条件查询
* select * from dubhe_user_risk_label where userstatus = 1 and userserial>=10 and userserial<=100 and assignproduct not like "%|ccb|%" limit 0,10
*/
@Test
public void test3() {
String methodDesc = "ESTest|test3||";
//1.拼接where条件
BoolQueryBuilder bqb = QueryBuilders.boolQuery();
//用户状态
QueryBuilder userStatusQb = QueryBuilders.termQuery(DubheUserRiskLabelFieldEnum.USERSTATUS.getCode(), UserStatusEnum.VALID.getCode());
bqb.must(userStatusQb);//添加筛选条件
//范围
RangeQueryBuilder userSerialRqb = QueryBuilders.rangeQuery(DubheUserRiskLabelFieldEnum.LENDERLIMIT.getCode())
.from(1L, true)
.to(5L, true);
bqb.filter(userSerialRqb);//添加筛选条件
//产品去重:not like
QueryBuilder prodCodeQb = QueryBuilders.wildcardQuery(DubheUserRiskLabelFieldEnum.ASSIGNPRODUCT.getCode(), "*" + PRODUCT_CODE_PROTECT + "ccb" + PRODUCT_CODE_PROTECT + "*");
bqb.mustNot(prodCodeQb);
//2.查询es
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX).setTypes(TYPE);
searchRequestBuilder.setQuery(bqb).setFrom(0).setSize(10);//返回一条数据
logger.info(methodDesc, "【请求ES】|ES请求参数:", searchRequestBuilder.toString());
SearchResponse searchResponse = searchRequestBuilder.get();
logger.info(methodDesc, "【请求ES-返回】|返回searchResponse:", searchResponse);
SearchHits searchHits = searchResponse.getHits();
for (SearchHit hit : searchHits.getHits()) {
//打印返回一条数据
logger.info(methodDesc, "【请求ES-返回一条数据】|一条数据:", hit.getSourceAsString());
}
Long count = searchHits.getTotalHits();
logger.info(methodDesc, "【请求ES-返回总数】|count:", count);
}
/**
* scroll-游标
* select * from dubhe_user_risk_label where userserial >= 1 and userserial <= 10 AND processflag=1
*/
@Test
public void test5() {
String methodDesc = "ESTest|test5||";
//1.拼接where条件
BoolQueryBuilder bqb = QueryBuilders.boolQuery();
QueryBuilder processFlagQb = QueryBuilders.termQuery(DubheUserRiskLabelFieldEnum.PROCESSFLAG.getCode(), ProcessFlagEnum.PROCESSED.getCode());
bqb.must(processFlagQb);//添加筛选条件
//范围
/* RangeQueryBuilder userSerialRqb = QueryBuilders.rangeQuery(DubheUserRiskLabelFieldEnum.USERSERIAL.getCode())
.from(1L, true)
.to(1000L, true);
bqb.filter(userSerialRqb);//添加筛选条件*/
logger.info(methodDesc, "ES-Scroll操作|BoolQueryBuilder:", bqb);
//返回的字段
String[] fields = {DubheUserRiskLabelFieldEnum.USERID.getCode(),
DubheUserRiskLabelFieldEnum.ACCOUNTTYPE.getCode(),
DubheUserRiskLabelFieldEnum.PROCESSFLAG.getCode()};
//4.scroll
SearchResponse scrollResp = getScroll(INDEX, TYPE, bqb, fields);
logger.info(methodDesc, "ES-Scroll操作-首次获取的条数:", scrollResp.getHits().getHits().length);
//计数器
Long stats = 0l;
Long userSerialNo = 0l;
do {
for (SearchHit hit : scrollResp.getHits().getHits()) {
//logger.debug(methodDesc, "数据内容|id:", hit.getId(), "|内容:", hit.getSourceAsString());
//todo 数据处理
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(TimeValue.timeValueSeconds(Long.valueOf(5l))).execute().actionGet();
logger.info(methodDesc, "ES-Scroll操作-非首次获取的条数:", scrollResp.getHits().getHits().length);
} while (scrollResp.getHits().getHits().length != 0);
//5.清理scroll
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(scrollResp.getScrollId());
client.clearScroll(request);
logger.info(methodDesc, "ES-Scroll操作-完成|处理条数:", stats, "|最终序号:", userSerialNo);
}
/**
* bulk-批量操作
* select * from dubhe_user_risk_label where userid like "show_pin_12121_%"
*/
@Test
public void test6() throws Exception {
BulkProcessor bulk = getBulkProcessor();
for (int i = 1; i < 100; i++) {
String userId = PREFIX_PIN + "00000000000" + i;
XContentBuilder builder = jsonBuilder()
.startObject()
.field("userid", userId)
.field("accountType", "JDPIN")
.endObject();
bulk.add(new IndexRequest(INDEX, TYPE).source(builder));
}
//bulk清理
bulk.flush();
bulk.close();
client.admin().indices().prepareRefresh(INDEX).get();
}
/**
* upsert : 有则更新(支持部分字段更新),无则插入
* select * from dubhe_user_risk_label where userid = "show_pin_12121_00011111"
*/
@Test
public void test4() throws Exception {
BulkProcessor bulk = getBulkProcessor();
String userId = PREFIX_PIN + "00011111";
String flag = "----";
XContentBuilder builder = jsonBuilder()
.startObject()
.field("userid", userId)
.field("accounttype", "JDPIN" + flag)
.field("usersource", "FK" + flag)
.field("usergrade", "A")
//.field("userstatus", "1")
.endObject();
//upsert
bulk.add(new UpdateRequest(INDEX, TYPE, userId).doc(builder).docAsUpsert(true));
}
/**
* 聚合函数:max
*/
@Test
public void test9() {
String methodDesc = "ESTest|test9||";
String userSerialMax = "userSerialMax";
AggregationBuilder aggBuilder = AggregationBuilders.max(userSerialMax).field(DubheUserRiskLabelFieldEnum.USERSERIAL.getCode());
BoolQueryBuilder bqb = QueryBuilders.boolQuery();
QueryBuilder processFlagQb = QueryBuilders.termQuery(DubheUserRiskLabelFieldEnum.PROCESSFLAG.getCode(), ProcessFlagEnum.PROCESSED.getCode());
bqb.must(processFlagQb);//添加筛选条件
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX).setTypes(TYPE)
.setQuery(bqb)
.addAggregation(aggBuilder)
.setFrom(0)
.setSize(1);
logger.info(methodDesc, "【请求ES】|ES请求对象:", searchRequestBuilder.toString());
SearchResponse searchResponse = searchRequestBuilder.get();
logger.info(methodDesc, "【请求ES-返回】|返回searchResponse:", searchResponse);
SearchHits searchHits = searchResponse.getHits();
/*for (SearchHit hit : searchHits.getHits()) {
//打印返回一条数据
//logger.info(methodDesc, "【请求ES-返回】|数据:", hit.getSourceAsString());
}*/
Max valueCount = searchResponse.getAggregations().get(userSerialMax);
Double max = valueCount.getValue();
logger.info(methodDesc, "【请求ES-返回总数】|max:", max);
}
/**
* 使用sql查询
* todo liuke 支持不完善【目前jar为快照版,不推荐线上使用】
*
* @throws Exception
*/
public void testEsSql() throws Exception {
String query = "select count(*) as num from dubhe_user_risk_label where userGrade='A'";
//SqlElasticSearchRequestBuilder select = (SqlElasticSearchRequestBuilder) searchDao.explain(query).explain();
//SearchResponse response = (SearchResponse) select.get();
//Aggregations agg = response.getAggregations();
//InternalValueCount agg2 = agg.get("num");
//System.out.println("=====统计总数:" + agg2.getValue());
}
//============================================================================//
public static BulkProcessor getBulkProcessor() {
//创建BulkProcessor
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
System.out.println("===============beforeBulk");
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
System.out.println("===============afterBulk_sucess");
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
System.out.println("===============afterBulk_fail:" + failure);
}
})
.setBulkActions(10000)//超过10000条,提交服务端
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//5M,提交服务端
.setFlushInterval(TimeValue.timeValueSeconds(5))//5s,刷新一次,提交服务端
//以上3个条件任何一个满足,则,提交服务端
.setConcurrentRequests(0)//默认1,表示积累和发送是异步的。https://blog.csdn.net/wslyk606/article/details/79413980
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//重试策略
.build();
return bulkProcessor;
}
private static SearchResponse getScroll(String index, String type, BoolQueryBuilder bqb, String[] fields) {
SearchResponse scrollResp = client.prepareSearch(index).setTypes(type)
.setScroll(TimeValue.timeValueSeconds(3l))//结果保存时间
.setVersion(true)
.setQuery(bqb)
.setFetchSource(fields, null)
.setSize(Integer.valueOf(10000))//每次获取的条数
.get(); //执行
return scrollResp;
}
}

5.2 Redis-bitmap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* @Describe: Redis-bitmap-test
* @Author: liuke
* @Date: 2018/12/19 12:01
*/
public class RedisBitMapTest extends BaseTest {
private static final Logger logger = LoggerFactory.getLogger(RedisBitMapTest.class);
@Resource
CacheClusterClient cacheClusterClient;
@Test
public void test() {
String key1 = "bitmap-test-1";
String key2 = "bitmap-test-2";
//1.获取"jd"的二进制:
System.out.println("=============获取\"jd\"的二进制:");
cacheClusterClient.set(key1, "jd");
byte[] value = cacheClusterClient.get(key1.getBytes());
BitSet set = byteArray2BitSet(value);
System.out.println("==============size:" + set.size());
for (int i = 0; i < set.size(); i++) {
System.out.println(i + ":" + set.get(i));
}
//2.演示:setbit
System.out.println("=============演示:setbit:");
cacheClusterClient.setbit(key2, 1, "1");
cacheClusterClient.setbit(key2, 2, "1");
cacheClusterClient.setbit(key2, 4, "1");
cacheClusterClient.setbit(key2, 6, "1");
cacheClusterClient.setbit(key2, 9, "1");
cacheClusterClient.setbit(key2, 10, "1");
cacheClusterClient.setbit(key2, 13, "1");
System.out.println("=============演示:setbit——结果:" + cacheClusterClient.get(key2));
//3.演示:getbit
System.out.println("演示:getbit:");
System.out.println(cacheClusterClient.getbit(key2, 1));
System.out.println(cacheClusterClient.getbit(key2, 2));
System.out.println(cacheClusterClient.getbit(key2, 4));
System.out.println(cacheClusterClient.getbit(key2, 6));
System.out.println(cacheClusterClient.getbit(key2, 9));
System.out.println(cacheClusterClient.getbit(key2, 10));
System.out.println(cacheClusterClient.getbit(key2, 13));
System.out.println("============");
System.out.println(cacheClusterClient.getbit(key2, 3));
System.out.println(cacheClusterClient.getbit(key2, 5));
System.out.println(cacheClusterClient.getbit(key2, 7));
System.out.println(cacheClusterClient.getbit(key2, 64));
System.out.println(cacheClusterClient.getbit(key2, 6400));
//3.演示:bitcount
System.out.println("演示:bitcount【位图中1的个数】(预期7个),结果:" + cacheClusterClient.bitcount(key2));
//4.高级-bitcount
System.out.println("演示:bitcount【第一个字节中1的个数】(预期4个),结果:" + cacheClusterClient.bitcount(key2, 0, 0));
System.out.println("演示:bitcount【第二个字节中1的个数】(预期3个),结果:" + cacheClusterClient.bitcount(key2, 1, 1));
//5.演示bitpos
System.out.println("演示:bitpos【第一个1出现的位置】,结果:" + "【【【R2m不支持】】】"/*cacheClusterClient.bitpos(key2,1)*/);
System.out.println("演示:bitpos【第一个0出现的位置】,结果:" + "【【【R2m不支持】】】"/*cacheClusterClient.bitpos(key2,0)*/);
}
/**
* byte转为bitset
*
* @param bytes
* @return
*/
public static BitSet byteArray2BitSet(byte[] bytes) {
BitSet bitSet = new BitSet(bytes.length * 8);
int index = 0;
for (int i = 0; i < bytes.length; i++) {
for (int j = 7; j >= 0; j--) {
bitSet.set(index++, (bytes[i] & (1 << j)) >> j == 1 ? true : false);
}
}
return bitSet;
}
}

5.3 java并发包-java.util.concurrent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* @Describe: java.util.concurrent:线程池、CountDownLatch
* @Author: liuke
* @Date: 2018/12/19 11:47
*/
public class ConcurrentTest {
private static final Logger log = LoggerFactory.getLogger(ConcurrentTest.class);
/**
* 线程池
*/
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
private static final Integer THREAD_NUM = 10;
public static void main(String[] args) throws Exception {
String methodDesc = "ConcurrentTest|main|学习java.util.concurrent:线程池、CountDownLatch|";
try {
CountDownLatch cdl = new CountDownLatch(THREAD_NUM);
for (int i = 0; i < THREAD_NUM; i++) {
final Map<String, Integer> processSerial = Maps.newHashMap();
processSerial.put("processSerial", i);
//开启10个线程处理
log.info(methodDesc, "开启线程:", processSerial.get("processSerial"));
executor.submit(new Runnable() {
@Override
public void run() {
//biz
biz(processSerial, cdl);
}
});
}
//CountDownLatch,等待10个线程执行完【成功 or 异常】,然后进行验证是否处理成功
cdl.await(100, TimeUnit.SECONDS);//超时退出,一般不会出现这种情况
log.info(methodDesc, "【===========线程执行完毕】");
} catch (Exception e) {
log.info(methodDesc, "异常|e:", e);
}
}
/**
* 业务逻辑
*
* @param processSerial
* @param cdl
*/
private static void biz(Map<String, Integer> processSerial, CountDownLatch cdl) {
String methodDesc = "ConcurrentTest|biz|业务逻辑|";
try {
log.info(methodDesc, "线程执行:", processSerial.get("processSerial"));
//todo 业务逻辑
Thread.sleep(5000);
} catch (Exception e) {
log.info(methodDesc, "异常|e:", e);
} finally {
//finally里面执行
cdl.countDown();
log.info(methodDesc, "线程:", processSerial.get("processSerial"), "执行-完成");
}
}
}
文章目录
  1. 1. 平台—新白名单
    1. 1.1. 一、背景
    2. 1.2. 二、核心流程
    3. 1.3. 三、基础知识
      1. 1.3.1. 3.1 ES索引结构
      2. 1.3.2. 3.2 ES数据展示
      3. 1.3.3. 3.3 ES与mysql对比
      4. 1.3.4. 3.4 倒序索引
      5. 1.3.5. 3.5 采用ES的原因
      6. 1.3.6. 3.6 Redis-bitmap展示
      7. 1.3.7. 3.7 Redis-bitmap本质
      8. 1.3.8. 3.8 采用bitmap原因
      9. 1.3.9. 3.9 使用场景(适用于结果非0即1的场景)
    4. 1.4. 四、业务相关
      1. 1.4.1. 4.1 为什么需要同步?
      2. 1.4.2. 4.2 为什么需要先插入“用户_产品维度的ES索引”,后更新操作?
      3. 1.4.3. 4.3 为什么ES中“产品列表”字段需要加“|”?
      4. 1.4.4. 4.4 一次添加白名单的全流程
      5. 1.4.5. 4.5 白名单现状
      6. 1.4.6. 4.6 待改进项
    5. 1.5. 五、代码展示
      1. 1.5.1. 5.1 ES
      2. 1.5.2. 5.2 Redis-bitmap
      3. 1.5.3. 5.3 java并发包-java.util.concurrent
|