1、引子

同步微信客户推送XXLJob(syncPushWxUserInfoJobShardingHandler)中调用的其中一段sql触发了慢查询。

任务:

  • 需要验证修改后的sql是否影响正常功能使用
  • 要调查推送的数据是否生效

2、探寻之路

2.1、确定推送内容

com.f6car.crm.push.user.BatchUserPusher#push找到推送内容格式:

1
2
3
4
5
6
7
8
9
10
11
12
{
"msgMeta": {
"type": "U",//com.f6car.crm.search.index.model.doc.MsgValues#TYPE_UPDATE
"case": "WF"//com.f6car.crm.search.index.model.vo.CarMsgFields#CASE_WE_CHAT_FOLLOWER
},
"rows": [
{
"idCustomer": "14870719207238217425",
"weChatFollower": 1//存在手机号的(存在userId)公众号粉丝未删除
}
]
}

2.2、找到推送kafka的topic

com.f6car.crm.push.user.BatchUserPusher#topic找到topic:

1
test.erp.batchCustomerCar.search

2.3、找到监听topic的地方

和搜索相关的功能都在crm-search工程,注意搜索**@KafkaListener注解,**找到监听topic的类方法:com.f6car.crm.search.index.listener.CarCustomerMessageListener#handleBatchMsg

1
2
3
4
5
6
7
8
    @Autowired
private KafkaMsgRecordHandler carCustomerBatchKafkaHandler;

//批量导入消息,根据消息中的carId,查到车辆车主信息后, 推送至全量消息队列处理
@KafkaListener(topics = "${kafka.topic.car_customer.batch.sync}")
public void handleBatchMsg(List<ConsumerRecord<String, String>> records) {
carCustomerBatchKafkaHandler.handle(records);
}

2.4、找到elasticsearch的index

下一步就是找到名为“carCustomerBatchKafkaHandler”的KafkaMsgRecordHandler的spring注入对象,这种spring注入方式只能全局搜索,找到:

com.f6car.crm.search.index.config.index.ConfigCarCustomerIndexFacade#carCustomerBatchKafkaHandler

1
2
3
4
@Bean
public KafkaMsgRecordHandler carCustomerBatchKafkaHandler(CommonIndexFacade<CarCustomerBatchSyncDoc> carCustomerBatchIndexFacade) {
return simpleKafkaMsgHandlerFactory.fullSyncMsgHandler(carCustomerBatchIndexFacade);
}

依次找com.f6car.crm.search.index.config.index.ConfigCarCustomerIndexFacade#carCustomerBatchIndexFacade

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Bean
public CommonIndexFacade<CarCustomerBatchSyncDoc> carCustomerBatchIndexFacade(CarCustomerBatchSyncParser parser,
CustomerWechatFullIndexFilter customerWechatFullIndexFilter) {
CommonIndexFacade<CarCustomerBatchSyncDoc> indexFacade = new CommonIndexFacade();
indexFacade.setDocParser(parser);

List<IndexFilterProcessor<CarCustomerBatchSyncDoc>> processors = new ArrayList<>();
processors.add(batchCustomerCarIndexProcessor);
processors.add(batchNoCarCustomerIndexProcessor);
IndexFilter<CarCustomerBatchSyncDoc> filter = new SimpleIndexFilter<>(processors);
List<IndexFilter<CarCustomerBatchSyncDoc>> filters = new ArrayList<>();
filters.add(customerWechatFullIndexFilter);
filters.add(customerManualLabelsPushFilter);
filters.add(carManualLabelsPushFilter);
filters.add(filter);
indexFacade.setIndexFilters(filters);
return indexFacade;
}

@Resource(name = "customerWechatBulkOptions")
private RequestOptions customerWechatBulkOptions;

@Bean
public CustomerWechatFullIndexFilter customerWechatFullIndexFilter(@Qualifier("cluster3Client") SimpleClient cluster3Client) {
SubmitIndexFilter<CarCustomerFullSyncDoc> submitIndexFilter = new SubmitIndexFilter<CarCustomerFullSyncDoc>(cluster3Client, carCustomerSubmitEntityBuilders());
//注意这里
submitIndexFilter.setRequestOptions(customerWechatBulkOptions);

CustomerWechatFullIndexFilter indexFilter = new CustomerWechatFullIndexFilter();
indexFilter.setCustomerWechatSubmitFilter(submitIndexFilter);
return indexFilter;
}

接着找到com.f6car.crm.search.core.config.index.ConfigCustomerWechatOptions#customerWechatBulkOptions

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public RequestOptions customerWechatBulkOptions(EsApiBuilder customerWechatEsApiBuilder) {
return new RequestOptions(HttpMethods.POST, customerWechatEsApiBuilder.bulk());
}
@Bean
public EsApiBuilder customerWechatEsApiBuilder() {
return new EsApiBuilder(customerWechatAlias,docType);
}
@Value("${elasticsearch.index.customer_wechat_alias}")
private String customerWechatAlias;
@Value("${elasticsearch.index.customer_wechat.docType}")
private String docType;

最终找到elasticsearch的index:

1
customer_wechat_alias_${spring.profiles.active}

2.5、通过工具查看搜索引擎数据

我这用的edge浏览器的Elasticvue插件:

img

查看数据格式:

img

这里的数据就是crm工程通过kafka传递到crm-search-index,最后推到elasticsearch的数据。

3、总结

  • spring在帮我们管理bean的同时也增加了使用开发者阅读代码的难度,是一把双刃剑,需要合理的运用。
  • crm-search-index工程使用的工厂及模板模式,在代码逻辑上很清洁,在框架内实现功能很便利,但是阅读及定位问题较繁琐。
  • 两个工程涉及到kafka与elasticsearch中间件的使用,需要补充相关知识。