1、引子
同步微信客户推送XXLJob(syncPushWxUserInfoJobShardingHandler)中调用的其中一段sql触发了慢查询。
任务:
- 需要验证修改后的sql是否影响正常功能使用
- 要调查推送的数据是否生效
2、探寻之路
2.1、确定推送内容
com.f6car.crm.push.user.BatchUserPusher#push找到推送内容格式:
1 2 3 4 5 6 7 8 9 10 11 12
| { "msgMeta": { "type": "U", "case": "WF" }, "rows": [ { "idCustomer": "14870719207238217425", "weChatFollower": 1 } ] }
|
2.2、找到推送kafka的topic
com.f6car.crm.push.user.BatchUserPusher#topic找到topic:
1
| test.erp.batchCustomerCar.search
|
2.3、找到监听topic的地方
和搜索相关的功能都在crm-search工程,注意搜索**@KafkaListener注解,**找到监听topic的类方法:com.f6car.crm.search.index.listener.CarCustomerMessageListener#handleBatchMsg
1 2 3 4 5 6 7 8
| @Autowired private KafkaMsgRecordHandler carCustomerBatchKafkaHandler;
@KafkaListener(topics = "${kafka.topic.car_customer.batch.sync}") public void handleBatchMsg(List<ConsumerRecord<String, String>> records) { carCustomerBatchKafkaHandler.handle(records); }
|
2.4、找到elasticsearch的index
下一步就是找到名为“carCustomerBatchKafkaHandler”的KafkaMsgRecordHandler的spring注入对象,这种spring注入方式只能全局搜索,找到:
com.f6car.crm.search.index.config.index.ConfigCarCustomerIndexFacade#carCustomerBatchKafkaHandler
1 2 3 4
| @Bean public KafkaMsgRecordHandler carCustomerBatchKafkaHandler(CommonIndexFacade<CarCustomerBatchSyncDoc> carCustomerBatchIndexFacade) { return simpleKafkaMsgHandlerFactory.fullSyncMsgHandler(carCustomerBatchIndexFacade); }
|
依次找com.f6car.crm.search.index.config.index.ConfigCarCustomerIndexFacade#carCustomerBatchIndexFacade
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Bean public CommonIndexFacade<CarCustomerBatchSyncDoc> carCustomerBatchIndexFacade(CarCustomerBatchSyncParser parser, CustomerWechatFullIndexFilter customerWechatFullIndexFilter) { CommonIndexFacade<CarCustomerBatchSyncDoc> indexFacade = new CommonIndexFacade(); indexFacade.setDocParser(parser);
List<IndexFilterProcessor<CarCustomerBatchSyncDoc>> processors = new ArrayList<>(); processors.add(batchCustomerCarIndexProcessor); processors.add(batchNoCarCustomerIndexProcessor); IndexFilter<CarCustomerBatchSyncDoc> filter = new SimpleIndexFilter<>(processors); List<IndexFilter<CarCustomerBatchSyncDoc>> filters = new ArrayList<>(); filters.add(customerWechatFullIndexFilter); filters.add(customerManualLabelsPushFilter); filters.add(carManualLabelsPushFilter); filters.add(filter); indexFacade.setIndexFilters(filters); return indexFacade; }
@Resource(name = "customerWechatBulkOptions") private RequestOptions customerWechatBulkOptions;
@Bean public CustomerWechatFullIndexFilter customerWechatFullIndexFilter(@Qualifier("cluster3Client") SimpleClient cluster3Client) { SubmitIndexFilter<CarCustomerFullSyncDoc> submitIndexFilter = new SubmitIndexFilter<CarCustomerFullSyncDoc>(cluster3Client, carCustomerSubmitEntityBuilders()); submitIndexFilter.setRequestOptions(customerWechatBulkOptions);
CustomerWechatFullIndexFilter indexFilter = new CustomerWechatFullIndexFilter(); indexFilter.setCustomerWechatSubmitFilter(submitIndexFilter); return indexFilter; }
|
接着找到com.f6car.crm.search.core.config.index.ConfigCustomerWechatOptions#customerWechatBulkOptions
1 2 3 4 5 6 7 8 9 10 11 12
| @Bean public RequestOptions customerWechatBulkOptions(EsApiBuilder customerWechatEsApiBuilder) { return new RequestOptions(HttpMethods.POST, customerWechatEsApiBuilder.bulk()); } @Bean public EsApiBuilder customerWechatEsApiBuilder() { return new EsApiBuilder(customerWechatAlias,docType); } @Value("${elasticsearch.index.customer_wechat_alias}") private String customerWechatAlias; @Value("${elasticsearch.index.customer_wechat.docType}") private String docType;
|
最终找到elasticsearch的index:
1
| customer_wechat_alias_${spring.profiles.active}
|
2.5、通过工具查看搜索引擎数据
我这用的edge浏览器的Elasticvue插件:
查看数据格式:
这里的数据就是crm工程通过kafka传递到crm-search-index,最后推到elasticsearch的数据。
3、总结
- spring在帮我们管理bean的同时也增加了使用开发者阅读代码的难度,是一把双刃剑,需要合理的运用。
- crm-search-index工程使用的工厂及模板模式,在代码逻辑上很清洁,在框架内实现功能很便利,但是阅读及定位问题较繁琐。
- 两个工程涉及到kafka与elasticsearch中间件的使用,需要补充相关知识。