7# asynchronous
Four ways to create threads
1) , inherit Thread
2) . implement Runnable interface
3) . implement Callable interface + FutureTask (you can get the returned results and handle exceptions)
9) Thread pool
Methods 1 and 2: the main process cannot obtain the operation result of the thread. Not suitable for the current scene
Method 3: the main process can obtain the operation results of the thread, but it is not conducive to controlling the thread resources in the server. Can cause the server to run out of resources.
Method 4: initialize the thread pool in the following two ways
Why use thread pool in development
- Reduce resource consumption
Reduce the loss caused by thread creation and destruction by reusing the created threads - Improve response speed
Because when the number of threads in the thread pool does not exceed the maximum limit of the thread pool, some threads are waiting to allocate tasks. When tasks come, they can be executed without creating new threads - Improve thread manageability
The thread pool will optimize the threads in the pool according to the current system characteristics to reduce the system overhead caused by creating and destroying threads. Unlimited thread creation and destruction not only consume system resources, but also reduce the stability of the system. Thread pool is used for unified allocation
public class ThreadTest { // Thread pool Executors public static ExecutorService service = Executors.newFixedThreadPool(10); /** * 1),Inherit Thread * Thread01 thread = new Thread01(); * thread.start();//Start thread * * 2),Implement Runnable interface * Runable01 runable01 = new Runable01(); * new Thread(runable01).start(); * 3),Implement Callable interface + FutureTask (you can get the returned results and handle exceptions) * FutureTask<Integer> futureTask = new FutureTask<>(new Callable01()); * new Thread(futureTask).start(); * //Blocking waits for the execution of the whole thread to complete and gets the returned result * Integer integer = futureTask.get(); * 4),Thread pool [ExecutorService] * Submit tasks directly to the thread pool. * service.execute(new Runable01()); * 1,establish: * 1),Executors * 2),new ThreadPoolExecutor * * Future:Asynchronous results can be obtained * * difference; * 1,2 Cannot get return value. 3 you can get the return value * 1,2,3 Can't control resources * 4 Can control resources, stable performance. */ public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main....start...."); // Thread call // T1 t1 = new T1(); // t1.start(); /*main....start.... main....end.... Current thread: 11 T1*/ // R1 r1 = new R1(); // new Thread(r1).start(); /*main....start.... main....end.... Current thread: 11 R1*/ // FutureTask<Integer> futureTask = new FutureTask<>(new C1()); // new Thread(futureTask).start(); // Integer integer = futureTask.get(); // After adding one, it will block // System. out. Println ("result returned by R1" + integer); /*main....start.... main....end.... Current thread: 11 C1*/ /*main....start.... Current thread: 11 C1 R1 Returned results 66 main....end....*/ // Thread pool Executors // // Executors.newCachedThreadPool() core is 0, and all are recyclable Executors.newFixedThreadPool() Fixed size, core=max;Are not recyclable Executors.newScheduledThreadPool() Thread pool for scheduled tasks Executors.newSingleThreadExecutor() Single threaded thread pool. The background obtains tasks from the queue and executes them one by one service.execute(new R1()); /*main....start.... main....end.... Current thread: 11 R1*/ // Thread pool ThreadPoolExecutor native /* * Seven parameters * corePoolSize:[5] Number of core threads [allowCoreThreadTimeOut]; thread pool, the number of threads ready after creation, waiting to accept asynchronous tasks for execution. * 5 Thread = new thread(); thread. start(); * maximumPoolSize:[200] Maximum number of threads; Control resources * keepAliveTime:Survival time. If the current number of threads is greater than the number of core s. * Free the idle thread (maximumpoolsize corepoolsize). As long as the idle thread is greater than the specified keepAliveTime; * unit:Time unit * BlockingQueue<Runnable> workQueue:Blocking queue. If there are many tasks, the current multiple tasks will be put in the queue. * As long as a thread is idle, it will go to the queue to take out a new task and continue to execute. * threadFactory:Thread creation factory. * RejectedExecutionHandler handler:If the queue is full, the task will be rejected according to the rejection policy specified by us * * * * Work sequence: * 1),Create a thread pool, prepare the number of core threads, and prepare to accept tasks * 1.1,core When it is full, put the incoming tasks into the blocking queue. The idle core will block the queue to get the task execution * 1.2,When the blocking queue is full, a new thread is directly opened for execution. The maximum number can only be opened to the number specified by max * 1.3,max When it is full, reject the task with RejectedExecutionHandler * 1.4,max The execution is complete and there is a lot of free time After the specified time keepAliveTime, release the max core threads * * new LinkedBlockingDeque<>(): The default is the maximum value of Integer. insufficient memory * * A thread pool core 7; max 20, queue: 50100 how to allocate concurrent entries; * 7 One will be executed immediately, 50 will enter the queue, and another 13 will be opened for execution. The remaining 30 use the reject policy. * If you don't want to abandon it, you have to implement it. CallerRunsPolicy; * */ ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 200, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); System.out.println("main....end...."); } // Inherit Thread public static class T1 extends Thread{ @Override public void run(){ System.out.println("Current thread:"+Thread.currentThread().getId()); System.out.println("T1"); } } // Implement Runnable interface public static class R1 implements Runnable{ @Override public void run(){ System.out.println("Current thread:"+Thread.currentThread().getId()); System.out.println("R1"); } } // Implement Callable interface + FutureTask (you can get the returned results and handle exceptions) public static class C1 implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("Current thread:"+Thread.currentThread().getId()); System.out.println("C1"); return 66; } } }
CompletableFuture
Asynchronous orchestration
whenComplete can handle normal and abnormal calculation results, and exceptionally handle abnormal situations.
The difference between whenComplete and whenCompleteAsync:
whenComplete: the thread executing the current task executes the task that continues to execute whenComplete.
whenCompleteAsync: the task of submitting whenCompleteAsync to other thread pools for execution.
The method does not end with Async, which means that the Action is executed with the same thread, and Async may be executed with other threads (if the same thread pool is used, it may also be selected for execution by the same thread)
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("main....start...."); // CompletableFuture.runAsync(() -> { // System.out.println("current thread:" + thread. Currentthread() getId()); // System.out.println("CompletableFuture..."); // }, service).whenComplete((res, ex) -> { // System.out.println("asynchronously completed... The result is:" + res + ". Exception:" + ex "); // }); /* main....start.... main....end.... Current thread: 11 CompletableFuture... Asynchronously completed The result is: null Exception: null * */ // CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { // System.out.println("current thread:" + thread. Currentthread() getId()); // System.out.println("CompletableFuture..."); // return 10/0; // }, service).whenComplete((res, ex) -> { // System.out.println("asynchronously completed... The result is:" + res + ". Exception:" + ex "); // }).exceptionally((t)->{ // System.out.println("exceptionally:" + t); // return 9; // }); // Integer integer = completableFuture1.get(); // System.out.println(integer); /* * main....start.... Current thread: 11 CompletableFuture... Asynchronously completed The result is: null Exception: Java util. concurrent. CompletionException: java. lang.ArithmeticException: / by zero exceptionally:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero 9 main....end....*/ CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getId()); System.out.println("CompletableFuture..."); return 10/1; }, service).handle((t,u)->{ // R apply(T t, U u); System.out.println("handle:"); if (t != null){ System.out.println("There are returned results:" + t); return 8; } if (u != null){ System.out.println("Daily existence:" + u); return 9; } return 5; }); Integer integer = completableFuture2.get(); System.out.println(integer); /*main....start.... Current thread: 11 CompletableFuture... handle: Existence: Java util. concurrent. CompletionException: java. lang.ArithmeticException: / by zero 9 main....end.... */ System.out.println("main....end...."); }
Serialization
- thenRun: cannot get the execution result of the previous step, no return value
- thenAcceptAsyne can accept the result of the previous step, but there is no return value
- Theapplyasync can accept the results of the previous step and has a return value
// CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { // System.out.println("current thread:" + thread. Currentthread() getId()); // System.out.println("CompletableFuture..."); // return 10; // }, service).thenApplyAsync((u)->{ // System.out.println("return value" + u); // System.out.println("task 2 start"); // return 5; // }); // System.out.println(completableFuture2.get()); /* * main....start.... Current thread: 11 CompletableFuture... Return value 10 Task 2 start 5 main....end.... * */
Two person task combination - complete together
- runAfterBothAsync is a two person task combination, which cannot get the result of the previous task and has no return value
- thenAcceptBothAsync two person task combination can get the results of the previous task and no return value
- Thencombinesync two person task combination can get the results of the previous task and return values
// CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> { // System.out.println("current thread:" + thread. Currentthread() getId()); // System.out.println("task 1...); // return 111; // }, service); // CompletableFuture<Integer> completableFuture4 = CompletableFuture.supplyAsync(() -> { // System.out.println("current thread:" + thread. Currentthread() getId()); // System.out.println("task 2...); // return 222; // }, service); // completableFuture3.runAfterBothAsync(completableFuture4,()->{ // System.out.println("task 3...); // },service); /* * main....start.... main....end.... Current thread: 11 Task 1 Current thread: 12 Task 2 Task 3 * */ // completableFuture3.thenAcceptBothAsync(completableFuture4, (f1,f2) -> { // System.out.println("task 3...); // System.out.println("f1:" + f1 + ".f2:" + f2); // }, service); /* * main....start.... main....end.... Current thread: 11 Task 1 Current thread: 12 Task 2 Task 3 f1:111.f2:222 * */ // CompletableFuture<Integer> integerCompletableFuture = completableFuture3.thenCombineAsync(completableFuture4, (f1, f2) -> { // System.out.println("task 3...); // System.out.println("f1:" + f1 + ".f2:" + f2); // return 3; // }, service); // System.out.println(integerCompletableFuture.get()); /* * main....start.... Current thread: 11 Task 1 Current thread: 12 Task 2 Task 3 f1:111.f2:222 3 main....end.... * */
Two person task combination - single completion
- runAfterEither: one of the two tasks is completed. There is no need to obtain the future results, process the task, and there is no return value.
- acceptEither: when one of the two tasks is completed, get its return value and process the task. There is no new return value, and only the thread result calling acceptEitherAsync function can be returned
- applyToEither: when one of the two tasks is completed, get its return value, process the task and have a new return value.
// CompletableFuture<Integer> completableFuture5 = CompletableFuture.supplyAsync(() -> { // System.out.println("current thread:" + thread. Currentthread() getId()); // System.out.println("task 1...); // return 111; // }, service); // // CompletableFuture<Integer> completableFuture6 = CompletableFuture.supplyAsync(() -> { // System.out.println("current thread:" + thread. Currentthread() getId()); // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println("task 2...); // return 222; // }, service); // completableFuture5.runAfterEitherAsync(completableFuture6, () -> { // System.out.println("task 3...); // }, service); /* * main....start.... main....end.... Current thread: 11 Task 1 Current thread: 12 Task 3 Task 2 * */ // completableFuture5.acceptEitherAsync(completableFuture6, (f1) -> { // System.out.println("f1:" + f1); // System.out.println("task 3...); // }, service); /* * main....start.... Current thread: 11 Task 1 Current thread: 12 main....end.... f1:111 Task 3 Task 2 * */ // CompletableFuture<Integer> integerCompletableFuture = completableFuture5.applyToEitherAsync(completableFuture6, (f1) -> { // System.out.println("f1:" + f1); // System.out.println("task 3...); // return 6; // }, service); // System.out.println(integerCompletableFuture.get()); /* * main....start.... Current thread: 11 Task 1 Current thread: 12 f1:111 Task 3 6 main....end.... Task 2 * */
Multi task combination
- allof: wait for all tasks to complete
- anyof: as long as one task is completed
CompletableFuture<Integer> completableFuture7 = CompletableFuture.supplyAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getId()); System.out.println("Task 1..."); return 111; }, service); CompletableFuture<Integer> completableFuture8 = CompletableFuture.supplyAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getId()); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task 2..."); return 222; }, service); CompletableFuture<Integer> completableFuture9 = CompletableFuture.supplyAsync(() -> { System.out.println("Current thread:" + Thread.currentThread().getId()); System.out.println("Task 3..."); return 333; }, service); // CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFuture7, completableFuture8, completableFuture9); // allOf.get(); // System.out.println("completableFuture7: " + completableFuture7.get() + "completableFuture8: " + completableFuture8.get() + "completableFuture9: " + completableFuture9.get()); /* * main....start.... Current thread: 11 Task 1 Current thread: 12 Current thread: 13 Task 3 Task 2 completableFuture7: 111completableFuture8: 222completableFuture9: 333 main....end.... * */ CompletableFuture<Object> anyOf = CompletableFuture.anyOf(completableFuture7, completableFuture8, completableFuture9); System.out.println("anyOf:"+anyOf.get()); /* * main....start.... Current thread: 11 Task 1 anyOf:111 main....end.... Current thread: 12 Current thread: 13 Task 3 Task 2 * */
Product details
@Override public SkuItemVo item(Long skuId) { SkuItemVo skuItemVo = new SkuItemVo(); // 1. Get the basic information PMS of SKU_ sku_ info SkuInfoEntity info; CompletableFuture<SkuInfoEntity> skuInfoEntityCompletableFuture = CompletableFuture.supplyAsync(() -> { SkuInfoEntity skuInfoEntity = getById(skuId); skuItemVo.setInfo(skuInfoEntity); return skuInfoEntity; }, threadPoolExecutor); // 2. Get the picture information PMS of SKU_ sku_ images List<SkuImagesEntity> images; CompletableFuture<Void> imagesFuture = CompletableFuture.runAsync(() -> { List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId); skuItemVo.setImages(images); }, threadPoolExecutor); // Thread serialization // 3. Get the sales attribute list of sku < skuitemvo SkuItemSaleAttrVo> saleAttr; CompletableFuture<Void> saleAttrFuture = skuInfoEntityCompletableFuture.thenAcceptAsync((res) -> { List<SkuItemSaleAttrVo> skuItemSaleAttrVos = skuSaleAttrValueService.getSaleAttrsBySpuId(res.getSpuId()); skuItemVo.setSaleAttr(skuItemSaleAttrVos); }, threadPoolExecutor); // //4. Get spu infodescentity desp; CompletableFuture<Void> despFuture = skuInfoEntityCompletableFuture.thenAcceptAsync((res) -> { SpuInfoDescEntity descEntity = spuInfoDescService.getById(res.getSpuId()); skuItemVo.setDesp(descEntity); }, threadPoolExecutor); // //5. Get the specification parameter group and the specification parameter list < skuitemvo SpuItemAttrGroupVo> groupAttrs; CompletableFuture<Void> groupAttrsFuture = skuInfoEntityCompletableFuture.thenAcceptAsync((res) -> { List<SpuItemAttrGroupVo> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuIdAndCatelogId(res.getSpuId(), res.getCatalogId()); skuItemVo.setGroupAttrs(groupAttrs); }, threadPoolExecutor); // Thread blocking try { CompletableFuture.allOf(imagesFuture,saleAttrFuture,despFuture,groupAttrsFuture).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return skuItemVo; }
<resultMap id="SkuItemSaleAttrVo" type="com.atguigu.gulimall.product.vo.SkuItemSaleAttrVo"> <result column="attr_id" property="attrId"></result> <result column="attr_name" property="attrName"></result> <collection property="attrValues" ofType="com.atguigu.gulimall.product.vo.AttrValueWithSkuIdVo"> <result column="attr_value" property="attrValue"></result> <result column="sku_ids" property="skuIds"></result> </collection> </resultMap> <select id="getSaleAttrsBySpuId" resultMap="SkuItemSaleAttrVo"> SELECT ssav.attr_id attr_id, ssav.attr_name attr_name, ssav.attr_value attr_value, GROUP_CONCAT(distinct si.sku_id) sku_ids FROM `pms_sku_info` as si LEFT JOIN `pms_sku_sale_attr_value` as ssav on si.sku_id = ssav.sku_id WHERE si.spu_id = #{spuId} GROUP BY ssav.attr_id,ssav.attr_name,ssav.attr_value </select> <!-- <select id="getSaleAttrsBySpuId" resultType="com.atguigu.gulimall.product.vo.SkuItemSaleAttrVo">--> <!-- SELECT--> <!-- ssav.attr_id attr_id,--> <!-- ssav.attr_name attr_name,--> <!-- GROUP_CONCAT(distinct ssav.attr_value) attr_values--> <!-- FROM `pms_sku_info` as si--> <!-- LEFT JOIN `pms_sku_sale_attr_value` as ssav on si.sku_id = ssav.sku_id--> <!-- WHERE si.spu_id = #{spuId}--> <!-- GROUP BY ssav.attr_id,ssav.attr_name--> <!-- </select>--> <!--SELECT ag.attr_group_id, ag.attr_group_name, aar.attr_id, attr.attr_name, pav.attr_value, pav.spu_id from `pms_attr_group` as ag LEFT JOIN `pms_attr_attrgroup_relation` as aar on ag.attr_group_id = aar.attr_group_id LEFT JOIN `pms_attr` as attr on aar.attr_id = attr.attr_id LEFT JOIN `pms_product_attr_value` as pav on pav.attr_id = attr.attr_id WHERE ag.catelog_id = 225 and pav.spu_id = 17; SELECT ssav.attr_id attr_id, ssav.attr_name attr_name, GROUP_CONCAT(distinct ssav.attr_value) attr_values FROM `pms_sku_info` as si LEFT JOIN `pms_sku_sale_attr_value` as ssav on si.sku_id = ssav.sku_id WHERE si.spu_id = 23 GROUP BY ssav.attr_id,ssav.attr_name; SELECT ssav.attr_id attr_id, ssav.attr_name attr_name, ssav.attr_value attr_value, GROUP_CONCAT(distinct si.sku_id) sku_id FROM `pms_sku_info` as si LEFT JOIN `pms_sku_sale_attr_value` as ssav on si.sku_id = ssav.sku_id WHERE si.spu_id = 23 GROUP BY ssav.attr_id,ssav.attr_name,ssav.attr_value -->
Certification services
Account password
@Controller public class LoginController { @Autowired private ThirdPartFeignService thirdPartFeignService; @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private MemberFeignService memberFeignService; @ResponseBody @GetMapping("/sms/sendcode") public R sendCode(@RequestParam("phone") String phone) { // redis cache prevents SMS interface from being brushed String s = stringRedisTemplate.opsForValue().get(AuthServerConstant.SMS_CODE_CACHE_PREFIX + phone); if (!StringUtils.isEmpty(s)) { long l = Long.parseLong(s.split("_")[1]); if (System.currentTimeMillis() - l < 60000) { // No sending within 60 seconds return R.error(BizCodeEnum.SMS_CODE_EXCEPTION.getCode(), BizCodeEnum.SMS_CODE_EXCEPTION.getMessage()); } } // Send SMS and cache redis String code = UUID.randomUUID().toString().substring(0, 5); String codeRedisValue = code + "_" + System.currentTimeMillis(); // second stringRedisTemplate.opsForValue().set(AuthServerConstant.SMS_CODE_CACHE_PREFIX + phone, codeRedisValue, 10, TimeUnit.MINUTES); thirdPartFeignService.sendCode(phone, code); return R.ok(); } /** * //TODO Redirection carries data, using the session principle. Put the data in the session. * As long as you skip to the next page and take out the data, the data in the session will be deleted * <p> * //TODO 1,session problem under distributed. * RedirectAttributes redirectAttributes: Simulated redirection carrying data * * @param vo * @param bindingResult * @param redirectAttributes * @return */ @PostMapping("/regist") public String regist(@Valid UserRegistVo vo, BindingResult bindingResult, RedirectAttributes redirectAttributes) { // Validation parameters if (bindingResult.hasErrors()) { // Validation parameter error Map<String, String> errors = bindingResult.getFieldErrors().stream().collect(Collectors.toMap( FieldError::getField, FieldError::getDefaultMessage, (entity1, entity2) -> entity1 // Solve Java lang.IllegalStateException: Duplicate key )); /*model*/ /* model.addAttribute("errors",errors); return "reg";*/ /*RedirectAttributes*/ redirectAttributes.addFlashAttribute("errors", errors); return "redirect:http://auth.gulimall.com/reg.html"; } // Check verification code String code = stringRedisTemplate.opsForValue().get(AuthServerConstant.SMS_CODE_CACHE_PREFIX + vo.getPhone()); if (!StringUtils.isEmpty(code)) { if (vo.getCode().equals(code.split("_")[0])) { // Member registration - Remote // Delete redis stringRedisTemplate.delete(AuthServerConstant.SMS_CODE_CACHE_PREFIX + vo.getPhone()); // long-range R regist = memberFeignService.regist(vo); if (regist.getCode() == 0 ){ // login was successful return "redirect:http://auth.gulimall.com/login.html"; }else{ Map<String, String> errors = new HashMap<>(); String msg = (String) regist.getData("msg",new TypeReference<String>() {}); errors.put("msg",msg); redirectAttributes.addFlashAttribute("errors",errors); return "redirect:http://auth.gulimall.com/reg.html"; } } else { Map<String, String> errors = new HashMap<>(); errors.put("code", "Verification code error"); redirectAttributes.addFlashAttribute("errors", errors); return "redirect:http://auth.gulimall.com/reg.html"; } } else { Map<String, String> errors = new HashMap<>(); errors.put("code", "Verification code error"); redirectAttributes.addFlashAttribute("errors", errors); return "redirect:http://auth.gulimall.com/reg.html"; } // return "redirect:http://auth.gulimall.com/reg.html"; } @GetMapping("login.html") public String login() { return "login"; } @PostMapping("login") public String login(UserLoginVo vo, RedirectAttributes redirectAttributes) { //Remote login R login = memberFeignService.login(vo); if (login.getCode() == 0){ return "redirect:http://gulimall.com"; }else { HashMap<String, String> errors = new HashMap<>(); Object msg = login.getData("msg", new TypeReference<String>() { }); errors.put("msg", (String) msg); redirectAttributes.addFlashAttribute("errors",errors); return "redirect:http://auth.gulimall.com/login.html"; } } }
/** * Registered member * @param vo */ @Override public void regist(MemberRegistVo vo) { MemberEntity memberEntity = new MemberEntity(); // Set default level MemberLevelEntity levelEntity = memberLevelDao.getDefaultLevel(); memberEntity.setLevelId(levelEntity.getId()); //Check whether the user name and mobile phone number are unique. In order for the controller to perceive exceptions, the exception mechanism checkPhoneUnique(vo.getPhone()); checkUsernameUnique(vo.getUserName()); memberEntity.setMobile(vo.getPhone()); memberEntity.setUsername(vo.getUserName()); memberEntity.setNickname(vo.getUserName()); // The password should be encrypted and stored - bcryptpasswordencoder, which comes with springboot memberEntity.setPassword(new BCryptPasswordEncoder().encode(vo.getPassword())); //Other default information // preservation this.baseMapper.insert(memberEntity); } /** * land * @param vo * @return */ @Override public MemberEntity login(MemberLoginVo vo) { String loginacct = vo.getLoginacct(); String password = vo.getPassword(); MemberEntity memberEntity = baseMapper.selectOne(new QueryWrapper<MemberEntity>().eq("username", loginacct).or().eq("mobile", loginacct)); if (memberEntity != null) { // Password matching boolean matches = new BCryptPasswordEncoder().matches(password, memberEntity.getPassword()); if (matches) { return memberEntity; } else { return null; } } else { return null; } } private void checkUsernameUnique(String userName) { Integer count = this.baseMapper.selectCount(new QueryWrapper<MemberEntity>().eq("username", userName)); if (count > 0) { throw new UsernameExistException(); } } private void checkPhoneUnique(String phone) { Integer mobile = this.baseMapper.selectCount(new QueryWrapper<MemberEntity>().eq("mobile", phone)); if (mobile > 0) { throw new PhoneExsitException(); } }
OAuth2.0
@GetMapping("/oauth2.0/weibo/success") public String loginFromWB(@RequestParam("code") String code, HttpSession session) throws Exception { Map<String, String> header = new HashMap<>(); Map<String, String> query = new HashMap<>(); Map<String, String> body = new HashMap<>(); body.put("client_id", "xxxx"); body.put("client_secret", "xxxx"); body.put("grant_type", "xxxx"); body.put("redirect_uri", "xxxxx"); body.put("code", code); // Get accessToken and uid according to code HttpResponse response = HttpUtils.doPost("https://api.weibo.com", "/oauth2/access_token", "post", header, query, body); System.out.println(response.toString()); if (response.getStatusLine().getStatusCode() == 200) { String json = EntityUtils.toString(response.getEntity()); SocialUser socialUser = JSON.parseObject(json, SocialUser.class); // Login or registration - Remote R oauthlogin = memberFeignService.oauthLogin(socialUser); if (oauthlogin.getCode() == 0) { MemberRespVo memberRespVo = (MemberRespVo) oauthlogin.getData("data", new TypeReference<MemberRespVo>() {}); log.info("Login user information:" + memberRespVo.toString()); } return "redirect:http://gulimall.com"; } else { return "redirect:http://auth.gulimall.com/login.html"; } }
/** * Microblog login or registration * * @param socialUser * @return */ @Override public MemberEntity loginFromWB(SocialUser socialUser) { // 1 uid determines whether you have logged in MemberEntity memberEntity = this.baseMapper.selectOne(new QueryWrapper<MemberEntity>().eq("social_uid", socialUser.getUid())); if (memberEntity == null) { // register MemberEntity regist = new MemberEntity(); // Carry access_token and uid query user information from microblog Map<String, String> header = new HashMap<>(); Map<String, String> query = new HashMap<>(); query.put("access_token", socialUser.getAccess_token()); query.put("uid", socialUser.getUid()); try { HttpResponse response = HttpUtils.doGet("https://api.weibo.com", "/2/users/show.json", "get", header, query); if (response.getStatusLine().getStatusCode() == 200) { String s = EntityUtils.toString(response.getEntity()); JSONObject jsonObject = JSON.parseObject(s); String name = jsonObject.getString("name"); String gender = jsonObject.getString("gender"); // ... regist.setNickname(name); regist.setUsername(name); regist.setGender("m".equals(gender) ? 1 : 0); // ... }else{ log.info("Microblog query user error 11:" + response.toString()); } } catch (Exception e) { log.info("Microblog query user error 22:" + e.getMessage()); } regist.setSocialUid(socialUser.getUid()); regist.setAccessToken(socialUser.getAccess_token()); regist.setExpiresIn(socialUser.getExpires_in()); this.baseMapper.insert(regist); return regist; } else { // land // Update social related fields MemberEntity update = new MemberEntity(); update.setId(memberEntity.getId()); update.setAccessToken(socialUser.getAccess_token()); update.setExpiresIn(socialUser.getExpires_in()); this.baseMapper.updateById(update); memberEntity.setAccessToken(socialUser.getAccess_token()); memberEntity.setExpiresIn(socialUser.getExpires_in()); return memberEntity; } }
Session sharing
Same domain name, different services
Subdomain name, different services
springsession
<!-- 1,integration SpringSession complete session Sharing problem--> <dependency> <groupId>org.springframework.session</groupId> <artifactId>spring-session-data-redis</artifactId> </dependency>
@EnableRedisHttpSession //Integrate redis as session storage
spring.redis.host=192.168.56.10 spring.redis.port=6379 # session settings sharing spring.session.store-type=redis server.servlet.session.timeout=30m
@Configuration public class GulimallSessionConfig { /** * Specify the session scope, cookie name (session stores cookies by default) * @return */ @Bean public CookieSerializer cookieSerializer(){ DefaultCookieSerializer cookieSerializer = new DefaultCookieSerializer(); cookieSerializer.setDomainName("gulimall.com"); cookieSerializer.setCookieName("GULISESSION"); return cookieSerializer; } /** * JSON serialize * @return */ @Bean public RedisSerializer<Object> springSessionDefaultRedisSerializer() { return new GenericJackson2JsonRedisSerializer(); } }
Single sign on
- Leave a login trace on the login server
- When the login server wants to redirect the token information, it will bring it to the ur address
- Other systems need to process the key token on the url address, and save the user corresponding to the token into their own session as long as there is one
- The system saves the user in its own session
Shopping Cart
analysis
public static ThreadLocal<UserInfoTo> threadLocal = new ThreadLocal(); /** * Interceptor * Pre method * @param request * @param response * @param handler * @return * @throws Exception */ @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { UserInfoTo userInfoTo = new UserInfoTo(); HttpSession session = request.getSession(); MemberRespVo member = (MemberRespVo) session.getAttribute(AuthServerConstant.LOGIN_USER); if (member != null) { // land userInfoTo.setUserId(member.getId()); } // Set user key Cookie[] cookies = request.getCookies(); for (Cookie cookie : cookies) { if (cookie.getName().equals(CartConstant.TEMP_USER_COOKIE_NAME)) { userInfoTo.setUserKey(cookie.getValue()); userInfoTo.setTempUser(true); } } // If there is no temporary user, a temporary user must be assigned if (StringUtils.isEmpty(userInfoTo.getUserKey())) { String uuid = UUID.randomUUID().toString(); userInfoTo.setUserKey(uuid); } threadLocal.set(userInfoTo); return true; } /** * After business execution; Assign temporary users and let the browser save * Post method * @param request * @param response * @param handler * @param modelAndView * @throws Exception */ @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { UserInfoTo userInfoTo = threadLocal.get(); //If there is no temporary user, be sure to save a temporary user if (!userInfoTo.isTempUser()){ // You do not need to update the userKey every time Cookie cookie = new Cookie(CartConstant.TEMP_USER_COOKIE_NAME,userInfoTo.getUserKey()); cookie.setDomain("gulimall.com"); cookie.setMaxAge(CartConstant.TEMP_USER_COOKIE_TIMEOUT); response.addCookie(cookie); } }
/** * Add item to cart * <p> * RedirectAttributes ra * ra.addFlashAttribute();The data can be retrieved on the page by putting it in the session, but it can only be retrieved once * ra.addAttribute("skuId",skuId);Put data after url * * @return */ @GetMapping("/addToCart") public String addToCart(@RequestParam("skuId") Long skuId, @RequestParam("num") Integer num, Model model, RedirectAttributes redirectAttributes) throws ExecutionException, InterruptedException { cartService.addToCart(skuId, num); // model.addAttribute("item",cartItem); // return "success"; redirectAttributes.addAttribute("skuId", skuId); return "redirect:http://cart.gulimall.com/addToCartSuccess.html"; } /** * Jump to success page * * @param skuId * @param model * @return */ @GetMapping("/addToCartSuccess.html") public String addToCartSuccessPage(@RequestParam(value = "skuId") Long skuId, Model model) { // //Redirect to success page. Just query the shopping cart data again CartItem item = cartService.getCartItem(skuId); model.addAttribute("item", item); return "success"; }
private static final String CART_PREFIX = "gulimall:cart:"; @Autowired StringRedisTemplate redisTemplate; @Autowired ThreadPoolExecutor executor; @Autowired ProductFeignService productFeignService; /** * Add / modify shopping cart * * @param skuId * @param num * @return * @throws ExecutionException * @throws InterruptedException */ @Override public CartItem addToCart(Long skuId, Integer num) throws ExecutionException, InterruptedException { BoundHashOperations<String, Object, Object> cartOps = getCartOps(); String res = (String) cartOps.get(skuId.toString()); if (StringUtils.isEmpty(res)) { // Shopping cart addition CartItem cartItem = new CartItem(); // Query product information - Remote CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> { R info = productFeignService.info(skuId); SkuInfoVo skuInfo = (SkuInfoVo) info.getData("skuInfo", new TypeReference<SkuInfoVo>() { }); cartItem.setSkuId(skuId); cartItem.setCheck(true); cartItem.setCount(num); cartItem.setImage(skuInfo.getSkuDefaultImg()); cartItem.setTitle(skuInfo.getSkuTitle()); cartItem.setPrice(skuInfo.getPrice()); }, executor); // sku combination information CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> { List<String> skuSaleAttrValues = productFeignService.getSkuSaleAttrValues(skuId); cartItem.setSkuAttr(skuSaleAttrValues); }, executor); CompletableFuture.allOf(completableFuture1, completableFuture2).get(); // block // Add goods to the shopping cart (redis) cartOps.put(skuId.toString(), JSON.toJSONString(cartItem)); return cartItem; } else { // Shopping cart (existing) modification CartItem cartItem = JSON.parseObject(res, CartItem.class); cartItem.setCount(cartItem.getCount() + num); cartOps.put(skuId.toString(), JSON.toJSONString(cartItem)); return cartItem; } } /** * Get cartItem * * @param skuId * @return */ @Override public CartItem getCartItem(Long skuId) { BoundHashOperations<String, Object, Object> cartOps = getCartOps(); String s = (String) cartOps.get(skuId.toString()); CartItem cartItem = JSON.parseObject(s, CartItem.class); return cartItem; } /** * Get shopping cart * Merge logged in / unlisted shopping cart items * * @return */ @Override public Cart getCart() throws ExecutionException, InterruptedException { Cart cart = new Cart(); UserInfoTo userInfoTo = CartInterceptor.threadLocal.get(); String cartKey = ""; if (userInfoTo.getUserId() != null) { // land // Data at login cartKey = CART_PREFIX + userInfoTo.getUserId(); // Data when not logged in String tempCartKey = CART_PREFIX + userInfoTo.getUserKey(); List<CartItem> tempCartItems = getCartItems(tempCartKey); // merge if (tempCartItems != null) { for (CartItem cartItem : tempCartItems) { addToCart(cartItem.getSkuId(), cartItem.getCount()); } // Empty temporary data clearCart(tempCartKey); } List<CartItem> cartItems = getCartItems(cartKey); cart.setItems(cartItems); } else { // Not logged in cartKey = CART_PREFIX + userInfoTo.getUserKey(); List<CartItem> cartItems = getCartItems(cartKey); cart.setItems(cartItems); } return cart; } /** * Modify default selection button * @param skuId * @param check */ @Override public void checkItem(Long skuId, Integer check) { BoundHashOperations<String, Object, Object> cartOps = getCartOps(); CartItem cartItem = getCartItem(skuId); cartItem.setCheck(check==1?true:false); String s = JSON.toJSONString(cartItem); cartOps.put(skuId.toString(),s); } /** * Number of shopping items * @param skuId * @param num */ @Override public void changeItemCount(Long skuId, Integer num) { BoundHashOperations<String, Object, Object> cartOps = getCartOps(); CartItem cartItem = getCartItem(skuId); cartItem.setCount(num); String s = JSON.toJSONString(cartItem); cartOps.put(skuId.toString(),s); } /** * Shopping item deletion * @param skuId */ @Override public void deleteItem(Long skuId) { BoundHashOperations<String, Object, Object> cartOps = getCartOps(); cartOps.delete(skuId.toString()); } /** * empty cart * * @param cartKey */ private void clearCart(String cartKey) { redisTemplate.delete(cartKey); } /** * Get all cart items in the cart * Not logged in * redis based * * @param cartKey * @return */ private List<CartItem> getCartItems(String cartKey) { BoundHashOperations<String, Object, Object> ops = redisTemplate.boundHashOps(cartKey); List<Object> values = ops.values(); if (values != null && values.size() > 0) { List<CartItem> collect = values.stream().map(item -> { String s = (String) item; CartItem cartItem = JSON.parseObject(s, CartItem.class); return cartItem; }).collect(Collectors.toList()); return collect; } return null; } /** * Get the shopping cart we want to operate * Hass storage based on redis * gulimall:cart:1 * * @return */ public BoundHashOperations<String, Object, Object> getCartOps() { UserInfoTo userInfoTo = CartInterceptor.threadLocal.get(); String cartKey = ""; if (userInfoTo.getUserId() != null) { cartKey = CART_PREFIX + userInfoTo.getUserId(); } else { cartKey = CART_PREFIX + userInfoTo.getUserKey(); } BoundHashOperations<String, Object, Object> operations = redisTemplate.boundHashOps(cartKey); return operations; }
Message queue
Introduction to RabbitMQ
RabbitMQ is an open source implementation of AMQP (Advanced message queue protocol) developed by erlang.
Core concept
- Message message is an unnamed message, which consists of a message header and a message body. The message body is opaque, while the message header consists of a series of optional attributes, including routing key (routing key), priority (priority over other messages), delivery mode (indicating that persistent storage may be required for this cancellation), etc.
- Publisher is the producer of messages and a client application that publishes messages to the exchange.
- Exchange switch, which is used to receive messages sent by producers and route these messages to queues in the server.
- There are four types of Exchange: direct (default), fanout, topic, and headers. Different types of Exchange have different policies for forwarding messages
- Queue message queue, which is used to save information until it is sent to consumers. It is the container of messages and the destination of messages. A message can be put into one or more queues. The message has been, measuring and waiting for consumers to get in the queue and take it away.
- Binding is the association between the message Queue and the Exchange. A binding is a routing rule that connects the switch and the message Queue based on the routing key, so the switch can be understood as a bound seedling. The binding of Exchange and Queue can be a many to many relationship.
- Connection network connection, such as a TCP connection.
- Channel, an independent bidirectional data flow channel in a multiplex connection. The channel is a virtual connection established in a real TCP connection. AMQP orders are sent through the channel. Whether it is publishing messages, subscribing to queues or receiving messages, this action is completed through the channel. Because it is very expensive to establish and destroy TCP for operations, the concept of channel is introduced to reuse a TCP connection.
- Consumer the consumer of a message, which represents a client application that gets a message from the message queue.
- Virtual Host a Virtual Host that represents a batch of switches, message queues, and related objects. A Virtual Host is a separate server domain that shares the same authentication and encryption environment. Each vhost is essentially a mini RabbitMQ server with its own queue, switch, binding and permission mechanism. vhost is the basis of AMQP concept and must be specified during connection. The default vhost of RabbitMQ is /.
- A Broker represents a message queuing server entity
install
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management docker update rabbitmq --restart=always http://192.168.56.10:15672 guest guest
exchange type
Code test
Send to queue
Copy Configuration --server.port=9001
- Order service starts multiple; Only one client can receive the same message (randomly to the service by default)
- When only one message is completely processed and the method runs, we can receive the next message (queue)
@Autowired AmqpAdmin amqpAdmin; @Autowired RabbitTemplate rabbitTemplate; /** * 1,How to create exchange [Hello Java exchange], Queue and Binding * 1),Create using AmqpAdmin * 2,How to send and receive messages * * * Using RabbitMQ * * 1,Introducing amqp scenarios; RabbitAutoConfiguration will take effect automatically * * * * 2,The container is automatically configured * * RabbitTemplate,AmqpAdmin,CachingConnectionFactory,RabbitMessagingTemplate; * * All properties are spring rabbitmq * * @ConfigurationProperties(prefix = "spring.rabbitmq") * * public class RabbitProperties * * * * 3,Configure spring.com in the configuration file Rabbitmq information * * 4,@EnableRabbit: @EnableXxxxx;Turn on function * * 5,Listen for messages: use @ RabbitListener; @ EnableRabbit is required * * @RabbitListener: Class + method (just listen to which queues) */ @Test public void createExchange() { DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false); amqpAdmin.declareExchange(directExchange); log.info("Exchange[{}]Created successfully", "hello-java-exchange"); } @Test public void createQueue() { Queue queue = new Queue("hello-java-queue", true, false, false); amqpAdmin.declareQueue(queue); log.info("Queue[{}]Created successfully", "hello-java-queue"); } @Test public void createBinding() { //(String destination, // DestinationType destinationType [destination type], // String exchange, // String routingKey, //Map < string, Object > arguments) //Bind the switch specified by exchange with the destination, and use routingKey as the specified routing key Binding binding = new Binding( "hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null ); amqpAdmin.declareBinding(binding); log.info("Binding[{}]Created successfully", "hello-java-binding"); } @Test public void sendMessageTest() { // String s = "hello"; for (int i = 0; i < 10; i++) { if (i%2 == 0){ OrderEntity orderEntity = new OrderEntity(); orderEntity.setId(8L); orderEntity.setCreateTime(new Date()); orderEntity.setNote("note" + i); rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderEntity); log.info("information[{}]Sent successfully", orderEntity); }else{ OrderItemEntity orderItemEntity = new OrderItemEntity(); orderItemEntity.setSkuName("name" + i); rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderItemEntity); log.info("information[{}]Sent successfully", orderItemEntity); } } }
listen queue
@RabbitListener(queues = {"hello-java-queue"}) @Service("orderItemService") public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService { /** * queues: Declare all queues that need to listen * * org.springframework.amqp.core.Message * * You can write the type of parameter * 1,Message message: Native message details. Head + body * 2,T<Type of message sent > orderreturnreasonentity content; * 3,Channel channel: Current data transmission channel * * Queue: A lot of people can listen. As long as a message is received, the queue deletes the message, and only one receives the message * Scenario: * 1),Order service starts multiple; Only one client can receive the same message (randomly to the service by default) * 2), When only one message is completely processed and the method runs, we can receive the next message (queue) */ @RabbitHandler public void recieveMessage(Message message, OrderEntity content) { // byte[] body = message.getBody(); // System.out.println("Arrays:"); // System.out.println(Arrays.toString(body)); // System.out.println("header:"); // System.out.println(message.getMessageProperties()); System.out.println("Message received:" + message + "===>content:" + content); } @RabbitHandler public void recieveMessage(Message message, OrderItemEntity content) { // byte[] body = message.getBody(); // System.out.println("Arrays:"); // System.out.println(Arrays.toString(body)); // System.out.println("header:"); // System.out.println(message.getMessageProperties()); System.out.println("Message received:" + message + "===>content:" + content); } }
reliability
@Configuration public class MyRabbitConfig { @Autowired RabbitTemplate rabbitTemplate; @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } /** * Customize RabbitTemplate * 1,The server calls back when it receives a message * 1,spring.rabbitmq.publisher-confirms=true * 2,Set confirmation callback ConfirmCallback * 2,The message arrives at the queue correctly for callback * 1, spring.rabbitmq.publisher-returns=true * spring.rabbitmq.template.mandatory=true * 2,Set the confirmation callback ReturnCallback * * 3,Consumer side confirmation (ensure that each message is consumed correctly, and then the broker can delete this message). * spring.rabbitmq.listener.simple.acknowledge-mode=manual Manual sign in * 1,The default is automatic confirmation. As long as the message is received, the client will automatically confirm and the server will remove the message * Question: * We received many messages and automatically replied to the server ack. Only one message was processed successfully and went down. Message loss will occur; * Consumer manual confirmation mode. As long as we don't tell MQ clearly, the goods are signed. Without Ack, * The message is always unacketed. Even if the Consumer goes down. The message will not be lost. It will become Ready again. The next time a new Consumer connects, it will be sent to him * 2,How to sign in: * channel.basicAck(deliveryTag,false);Sign for; If the business is successfully completed, it should be signed in * channel.basicNack(deliveryTag,false,true);Refusal of visa; Business failed, visa refused */ @PostConstruct //After the MyRabbitConfig object is created, execute this method public void initRabbitTemplate() { //Set producer p arrival broker agent acknowledgement callback rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * 1,ack=true as long as the message arrives at the Broker * Whether it reaches the queue or not * @param correlationData Unique associated data of the current message (this is the unique id of the message) * @param b Whether the message was received successfully * @param s Reasons for failure */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("confirm..."); System.out.println("correlationData==>" + correlationData); System.out.println("b====>" + b); System.out.println("s====>" + s); System.out.println("-----------------------"); /* * confirm... correlationData==>CorrelationData [id=462eeb8d-0e4a-4f0e-a2e1-5eed028a9408] b====>true s====>null ----------------------- * */ } }); //Set acknowledgement callback for message arrival queue rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * This failure callback is triggered as long as the message is not delivered to the specified queue * @param message Post failed message details * @param i Status code of reply * @param s Text content of reply * @param s1 The message was sent by the exchange * @param s2 Which routing key was used for this message at that time */ @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("returned..."); System.out.println("message==>" + message); System.out.println("i==>" + i); System.out.println("s1====>" + s1); System.out.println("s2====>" + s2); System.out.println("*****************"); /* * returned... message==>(Body:'{"id":null,"orderId":null,"orderSn":null,"spuId":null,"spuName":null,"spuPic":null,"spuBrand":null,"categoryId":null,"skuId":null,"skuName":"name9","skuPic":null,"skuPrice":null,"skuQuantity":null,"skuAttrsVals":null,"promotionAmount":null,"couponAmount":null,"integrationAmount":null,"realAmount":null,"giftIntegration":null,"giftGrowth":null}' MessageProperties [headers={spring_returned_message_correlation=f3dfbbfc-3ab6-413f-b9eb-47de4bf74b47, __TypeId__=com.atguigu.gulimall.order.entity.OrderItemEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) i==>312 s1====>hello-java-exchange s2====>hello123.java ***************** */ } }); } }
/** * queues: Declare all queues that need to listen * * org.springframework.amqp.core.Message * * You can write the type of parameter * 1,Message message: Native message details. Head + body * 2,T<Type of message sent > orderreturnreasonentity content; * 3,Channel channel: Current data transmission channel * * Queue: A lot of people can listen. As long as a message is received, the queue deletes the message, and only one receives the message * Scenario: * 1),Order service starts multiple; Only one client can receive the same message (randomly to the service by default) * 2), When only one message is completely processed and the method runs, we can receive the next message (queue) * * */ @RabbitHandler public void recieveMessage(Message message, OrderEntity content, Channel channel) { System.out.println("Message processing completed=>"+content.getNote()); // ack is in automatic mode by default and automatically signs in (Rookie post station) // ack mechanism - manual mode (personal sign in express) // Sign in sign long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("deliveryTag==>"+deliveryTag); // Signed in goods, non batch mode try { if (deliveryTag %2 ==0){ // Sign for channel.basicAck(deliveryTag,false); System.out.println("Signed for the goods..."+deliveryTag); }else{ // rejection // Both basicNack and basicReject can be returned // basicNack(long deliveryTag, boolean multiple, boolean requeue) // basicReject(long deliveryTag, boolean requeue) // Flag, batch, whether to put it back in the queue channel.basicNack(deliveryTag,false,true); System.out.println("No sign for the goods..."+deliveryTag); } } catch (IOException e) { e.printStackTrace(); } }
Order service
Order confirmation page
/** * feign Interceptor * Feign Remote call lost request header problem */ @Configuration public class GuliFeignConfig { @Bean public RequestInterceptor requestInterceptor() { return new RequestInterceptor() { @Override public void apply(RequestTemplate requestTemplate) { // 1. RequestContextHolder gets the request that just came in // RequestContextHolder thread context ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); System.out.println("RequestInterceptor intercept"); if (attributes != null) { HttpServletRequest request = attributes.getRequest(); // Old request if (request != null) { // Synchronize new request headers String header = request.getHeader("Cookie"); // Old request header requestTemplate.header("Cookie", header); // Sync to new request header } } } }; } }
Freight (simulation basically useless)
/** * Get freight based on address id * @param addrId * @return */ @Override public FareVo getFare(Long addrId) { FareVo fareVo = new FareVo(); R r = memberFeignService.addrInfo(addrId); MemberAddressVo data = (MemberAddressVo) r.getData("memberReceiveAddress",new TypeReference<MemberAddressVo>() { }); if(data!=null){ String phone = data.getPhone(); String substring = phone.substring(phone.length() - 1, phone.length()); BigDecimal bigDecimal = new BigDecimal(substring); fareVo.setAddress(data); fareVo.setFare(bigDecimal); return fareVo; } return null; }
Interface idempotency
Interface idempotency means that the results of one request or multiple requests initiated by the user for the same operation are consistent.
Idempotent solution
token mechanism
1. The server provides an interface for sending tokens. When analyzing the business, we must obtain the token before executing the business, and the server will save the token to redis.
2, and then calling the business interface request, the token is carried in the past and is usually placed on the request header.
3. The server determines whether the token exists in redis. The presence indicates the first request, and then deletes the token to continue the business
4. Token acquisition, comparison and deletion must be atomic
- redis.get (token), token.equals, and redis.del (token) if these two operations are not atomic, they may cause high concurrency, get the same data, judge that they are successful, and continue the business for concurrent execution
- You can use lua script in redis to complete this operation: if redis call(’ get’, KEYS[1]) =ARGV[1] then return redis. call(’ del’, KEYS[1]) else return 0 end
Various locking mechanisms
- Database pessimistic lock
- select * from xxxx where id = 1 for update;
- Pessimistic locks are generally used with transactions, and the data locking time may be very long, which should be selected according to the actual situation.
- In addition, it should be noted that the id field must be a primary key or a unique index, otherwise it may cause the result of locking the table, which will be very troublesome to process.
-
Database pessimistic lock
This method is suitable for the update scenario, update t_goods set count =count-1,version =version + 1 where good_id=2 and version=1 obtain the version number of the current commodity according to the version, that is, before operating the inventory, and then bring this version number when operating. Let's sort it out. When we first operate the inventory, we get the version as 1 and call the inventory service, and the version becomes 2; However, there is a problem returning to the order service. The order service initiates to call the inventory service again. When the version passed by the order service is still 1 and the above sql statement is executed, it will not be executed; Because version has changed to 27, the where condition does not hold. This ensures that no matter how many times it is called, it will only be processed once. Optimistic lock is mainly used to deal with the problem of reading more and writing less -
Business layer distributed lock
If multiple machines may process the same data at the same time, for example, multiple machine timing tasks have received the same data processing, we can add a distributed lock to lock the data and release the lock after processing. To obtain a lock, you must first judge whether the data has been processed.
Various unique constraints
- Database unique constraint
- When inserting data, it should be inserted according to the unique index. For example, if the order number is the same, it is impossible to insert two records in the same order. We prevent duplication at the database level.
- This mechanism makes use of the unique constraint of the database primary key to solve the idempotent problem in the insert scenario. However, the requirement for a primary key is not a self increasing primary key, which requires the business to generate a globally unique primary key.
- In the scenario of database and table splitting, the routing rules should be implemented in the same database and table under the same request. Otherwise, the database PK constraint will not work because different databases and table PK are not related.
- redis set anti duplication
Many data needs to be processed and can only be processed once. For example, we can calculate the MDS of the data and put it into the set of redis. Each time redis processes the data, first see whether the MD5 already exists. If it exists, it will not be processed.
Anti weight meter
Use the order number orderNo as the unique index of the de duplication table, insert the unique index into the de duplication table, and then conduct business operations, and they are in the same transaction. This ensures that the request fails due to the unique constraint of the de duplication table when the request is repeated, and avoids the idempotent problem. It should be noted here that the de duplication table and the business table should be in the same database, so as to ensure that the data of the de duplication table will be rolled back even if the business operation fails in the same transaction. This ensures data consistency.
redis weight prevention as mentioned before is also included
Global request unique id
When the interface is called, a unique id is generated. redis saves the data to the collection (de duplication). It is processed if it exists.
You can use nginx to set the unique ID of each request; proxy_set_header X-Request-ld Srequest_id
Lock inventory
Generate order + latch stock code
@PostMapping("/submitOrder") public String submitOrder(OrderSubmitVo vo, Model model, RedirectAttributes redirectAttributes) { System.out.println("Order submission data..." + vo); try { SubmitOrderResponseVo submitOrderResponseVo = orderService.submitOrder(vo); System.out.println("Create order return:" + submitOrderResponseVo); if (submitOrderResponseVo.getCode() == 0) { // Order submitted successfully model.addAttribute("submitOrderResp", submitOrderResponseVo); return "pay"; } else { // Failed, redirect order confirmation page String msg = "Order failed:"; switch (submitOrderResponseVo.getCode()) { case 1: msg += "The order information is expired, please refresh and submit again"; break; case 2: msg += "The price of the order has changed. Please confirm and submit again"; break; case 3: msg += "Failed to lock the inventory. The commodity inventory is insufficient"; break; } redirectAttributes.addAttribute("msg", msg); } } catch (Exception e) { System.out.println(e.getMessage()); if (e instanceof NoStockException) { // If the repository is abnormal, a business error is thrown String message = e.getMessage(); redirectAttributes.addFlashAttribute("msg", message); } e.printStackTrace(); } return "redirect:http://order.gulimall.com/toTrade"; }
/** * place order * * @param vo * @return */ @Transactional @Override public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) { SubmitOrderResponseVo submitOrderResponseVo = new SubmitOrderResponseVo(); MemberRespVo memberRespVo = LoginUserInterceptor.loginUser.get(); confirmVoThreadLocal.set(vo); submitOrderResponseVo.setCode(0); //1. Authentication token String orderToken = vo.getOrderToken(); String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; //Atomic authentication token and deletion token Long execute = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberRespVo.getId()), orderToken); // String redisToken = stringRedisTemplate.opsForValue().get(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberRespVo.getId()); // If (ordertoken! = null & & redistoken. Equals (ordertoken)) {/ / atomicity cannot be guaranteed. You should use the lua script of redis // } if (execute == 0L) { // Token validation failed submitOrderResponseVo.setCode(1); return submitOrderResponseVo; } else { // Token authentication succeeded // Create order, order item and other information OrderCreateTo order = createOrder(); // 2. If the difference between the payable amount transferred from the price verification client and the payable amount of the generated order is less than 0.01, it can be passed BigDecimal payPrice = vo.getPayPrice(); BigDecimal payAmount = order.getOrder().getPayAmount(); if (Math.abs(payPrice.subtract(payAmount).doubleValue()) < 0.01) {// Pass the price inspection // 3. Save order saveOrder(order); // 4. Inventory lock. Roll back the order data as long as there are exceptions. WareSkuLockVo lockVo = new WareSkuLockVo(); lockVo.setOrderSn(order.getOrder().getOrderSn()); List<OrderItemVo> collect = order.getOrderItems().stream().map(item -> { OrderItemVo orderItemVo = new OrderItemVo(); orderItemVo.setSkuId(item.getSkuId()); orderItemVo.setCount(item.getSkuQuantity()); orderItemVo.setTitle(item.getSkuName()); return orderItemVo; }).collect(Collectors.toList()); lockVo.setLocks(collect); R r = wmsFeignService.orderLockStock(lockVo); if (r.getCode() == 0) { // Lock successful submitOrderResponseVo.setOrder(order.getOrder()); return submitOrderResponseVo; } else { // Lock failed String msg = (String) r.get("msg"); throw new NoStockException(msg); } } else { // Price verification failed submitOrderResponseVo.setCode(2); return submitOrderResponseVo; } } } /** * Save master order and order items * * @param order */ @Transactional public void saveOrder(OrderCreateTo order) { // Save master order OrderEntity orderEntity = order.getOrder(); orderEntity.setModifyTime(new Date()); this.save(orderEntity); // Save order item List<OrderItemEntity> orderItems = order.getOrderItems(); orderItemService.saveBatch(orderItems); } /** * Create order + order item * * @return */ private OrderCreateTo createOrder() { OrderCreateTo orderCreateTo = new OrderCreateTo(); // Generate order number String orderSn = IdWorker.getTimeId(); // 1 generate master order OrderEntity orderEntity = buildOrder(orderSn); // 2 order item List<OrderItemEntity> orderItemEntities = buildOrderItems(orderSn); // 3. Calculate the price, points and other related information, and supplement the main order information computePrice(orderEntity, orderItemEntities); orderCreateTo.setOrder(orderEntity); orderCreateTo.setOrderItems(orderItemEntities); return orderCreateTo; } /** * Calculate the comparison between orders and order items * * @param orderEntity * @param itemEntities */ private void computePrice(OrderEntity orderEntity, List<OrderItemEntity> itemEntities) { BigDecimal total = new BigDecimal("0.0"); // Payable amount of superimposed shopping items BigDecimal coupon = new BigDecimal("0.0"); // Coupon amount of superimposed shopping items BigDecimal integration = new BigDecimal("0.0"); // Bonus points of superimposed shopping items BigDecimal promotion = new BigDecimal("0.0"); // Promotion discount amount of superimposed shopping items BigDecimal gift = new BigDecimal("0.0"); // Bonus points of superimposed shopping items BigDecimal growth = new BigDecimal("0.0"); // Gift growth value of superimposed shopping items //The total amount of the order, superimposing the total amount information of each order item for (OrderItemEntity entity : itemEntities) { coupon = coupon.add(entity.getCouponAmount()); integration = integration.add(entity.getIntegrationAmount()); promotion = promotion.add(entity.getPromotionAmount()); total = total.add(entity.getRealAmount()); gift = gift.add(new BigDecimal(entity.getGiftIntegration().toString())); growth = growth.add(new BigDecimal(entity.getGiftGrowth().toString())); } // Price information of master order orderEntity.setTotalAmount(total); // Total amount of master order (sum of payable amount of shopping items) orderEntity.setPayAmount(total.add(orderEntity.getFreightAmount())); // Payable amount of master order (total amount of order + freight) orderEntity.setPromotionAmount(promotion); // Promotion discount amount of master order orderEntity.setIntegrationAmount(integration); // Credit discount amount of master order orderEntity.setCouponAmount(coupon); // Coupon amount of master order // Points and other information of master order orderEntity.setIntegration(gift.intValue()); // Bonus points of master order orderEntity.setGrowth(growth.intValue()); // Gift growth value of master order } /** * Create master order * * @param orderSn * @return */ private OrderEntity buildOrder(String orderSn) { OrderEntity orderEntity = new OrderEntity(); MemberRespVo memberRespVo = LoginUserInterceptor.loginUser.get(); OrderSubmitVo orderSubmitVo = confirmVoThreadLocal.get(); orderEntity.setOrderSn(orderSn); // order number orderEntity.setMemberId(memberRespVo.getId()); // Member ID // Set freight information and get receiving address information - Remote R fare = wmsFeignService.getFare(orderSubmitVo.getAddrId()); FareVo fareResp = (FareVo) fare.getData(new TypeReference<FareVo>() { }); //Set freight information orderEntity.setFreightAmount(fareResp.getFare()); //Set consignee information orderEntity.setReceiverCity(fareResp.getAddress().getCity()); orderEntity.setReceiverDetailAddress(fareResp.getAddress().getDetailAddress()); orderEntity.setReceiverName(fareResp.getAddress().getName()); orderEntity.setReceiverPhone(fareResp.getAddress().getPhone()); orderEntity.setReceiverPostCode(fareResp.getAddress().getPostCode()); orderEntity.setReceiverProvince(fareResp.getAddress().getProvince()); orderEntity.setReceiverRegion(fareResp.getAddress().getRegion()); //Set the relevant status information of the order orderEntity.setStatus(OrderStatusEnum.CREATE_NEW.getCode()); orderEntity.setDeleteStatus(0); // Not deleted return orderEntity; } /** * Generate shopping items * Get from shopping cart * The latest commodity price will be obtained once in the shopping cart * * @param orderSn */ private List<OrderItemEntity> buildOrderItems(String orderSn) { // The latest commodity price will be obtained once in the shopping cart - Remote List<OrderItemVo> currentUserCartItems = cartFeignService.getCurrentUserCartItems(); if (currentUserCartItems != null && currentUserCartItems.size() > 0) { List<OrderItemEntity> collect = currentUserCartItems.stream().map(cartItem -> { OrderItemEntity orderItemEntity = buildOrderItem(cartItem); orderItemEntity.setOrderSn(orderSn); return orderItemEntity; }).collect(Collectors.toList()); return collect; } return null; } /** * Generate a shopping item * * @param cartItem * @return */ private OrderItemEntity buildOrderItem(OrderItemVo cartItem) { OrderItemEntity itemEntity = new OrderItemEntity(); //1. Order information: Order No. - obtained //2. SPU information of goods R spuInfoBySkuId = productFeignService.getSpuInfoBySkuId(cartItem.getSkuId()); SpuInfoVo data = (SpuInfoVo) spuInfoBySkuId.getData(new TypeReference<SpuInfoVo>() { }); itemEntity.setSpuId(data.getId()); itemEntity.setSpuBrand(data.getBrandId().toString()); itemEntity.setSpuName(data.getSpuName()); itemEntity.setCategoryId(data.getCatalogId()); //3. sku information of goods itemEntity.setSkuId(cartItem.getSkuId()); // skuid itemEntity.setSkuName(cartItem.getTitle()); //sku name itemEntity.setSkuPic(cartItem.getImage()); //sku default picture itemEntity.setSkuPrice(cartItem.getPrice()); //sku's price String skuAttr = StringUtils.collectionToDelimitedString(cartItem.getSkuAttr(), ";"); itemEntity.setSkuAttrsVals(skuAttr); // sku passed; merge itemEntity.setSkuQuantity(cartItem.getCount()); // sku purchase quantity //4. Offer information [no] //5. Points information - Simple Business - growth value and points equal purchase amount x quantity itemEntity.setGiftGrowth(cartItem.getPrice().multiply(new BigDecimal(cartItem.getCount().toString())).intValue()); itemEntity.setGiftIntegration(cartItem.getPrice().multiply(new BigDecimal(cartItem.getCount().toString())).intValue()); //6. Price information of order item itemEntity.setPromotionAmount(new BigDecimal(0)); // Promotion discount amount itemEntity.setCouponAmount(new BigDecimal(0)); // Coupon discount amount itemEntity.setIntegrationAmount(new BigDecimal(0)); //Bonus amount //Actual amount of the current order item. Total - various benefits BigDecimal multiply = itemEntity.getSkuPrice().multiply(new BigDecimal(itemEntity.getSkuQuantity().toString())); BigDecimal subtract = multiply.subtract(itemEntity.getPromotionAmount()) .subtract(itemEntity.getCouponAmount()) .subtract(itemEntity.getIntegrationAmount()); itemEntity.setRealAmount(subtract); // Payable amount of shopping items after various discounts return itemEntity; }
/** * Order lock inventory * * @param vo * @return */ @Override @Transactional public Boolean orderLockStock(WareSkuLockVo vo) { // 1. Generally, find the nearest warehouse and lock the inventory according to the receiving address of the order. // 1. Make it easier to find the warehouse in which each commodity has inventory - sku - quantity - warehouse id List<OrderItemVo> orderItemVos = vo.getLocks(); List<SkuWareHasStock> collect = orderItemVos.stream().map(item -> { SkuWareHasStock skuWareHasStock = new SkuWareHasStock(); skuWareHasStock.setSkuId(item.getSkuId()); skuWareHasStock.setNum(item.getCount()); List<Long> wareIds = wareSkuDao.listWareIdHasSkuStock(item.getSkuId()); skuWareHasStock.setWareId(wareIds); return skuWareHasStock; }).collect(Collectors.toList()); // 2 lock inventory for (SkuWareHasStock hasStock : collect) { Boolean skuStocked = false; // Current sku repository flag. true means there is a repository Long skuId = hasStock.getSkuId(); List<Long> wareIds = hasStock.getWareId(); if (wareIds == null || wareIds.size() == 0) { throw new NoStockException(skuId); } for (Long wareId : wareIds) { Long count = wareSkuDao.lockSkuStock(skuId, wareId, hasStock.getNum()); if (count == 1) { skuStocked = true; } } if (!skuStocked) { //All warehouses of current goods are unlocked throw new NoStockException(skuId); } } //3. It must have been locked successfully return true; }
<!--query sku What warehouses are there id--> <select id="listWareIdHasSkuStock" resultType="java.lang.Long"> SELECT ware_id FROM `wms_ware_sku` WHERE sku_id=#{skuId} AND stock-stock_locked>0 </select> <!--sku Storage and locking--> <update id="lockSkuStock"> UPDATE `wms_ware_sku` SET stock_locked = stock_locked+#{num} WHERE sku_id=#{skuId} AND ware_id=#{wareId} AND stock-stock_locked>=#{num} </update>
Distributed transaction
reason
Local transaction
Distributed theory
seata
* Seata Control distributed transactions * 1),Each microservice must be created first undo_log; * 2),Install transaction coordinator; seata-server: https://github.com/seata/seata/releases * 3),integration * 1,Import dependency spring-cloud-starter-alibaba-seata seata-all-0.7.1 * 2,Unzip and start seata-server; * registry.conf: Registry configuration; modify registry type=nacos * file.conf: * 3,All microservices that want to use distributed transactions seata DataSourceProxy Proxy own data source * 4,Each microservice must be imported * registry.conf * file.conf vgroup_mapping.{application.name}-fescar-service-group = "default" * 5,Start test distributed transaction * 6,Mark the entry of distributed large transactions@GlobalTransactional * 7,For every small remote transaction @Transactional
CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
<!--Distributed transaction lock--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency>
registry.conf: Registry configuration; modify registry type=nacos
// DataSourceProxy proxy own data source @Configuration public class MySeataConfig { @Autowired DataSourceProperties dataSourceProperties; @Bean public DataSource dataSource(DataSourceProperties dataSourceProperties){ //properties.initializeDataSourceBuilder().type(type).build(); HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build(); if (StringUtils.hasText(dataSourceProperties.getName())) { dataSource.setPoolName(dataSourceProperties.getName()); } return new DataSourceProxy(dataSource); } }
Each microservice, replication registry.conf,file.conf, among file.conf,It needs to be changed, vgroup_mapping.{application.name}-fescar-service-group = "default" Entry annotation of distributed large transactions@GlobalTransactional For every small remote transaction @Transactional
Order generation is highly concurrent and is not suitable for distributed transactions
Generate goods, multiple remote, suitable for distributed transactions
Rabbitmq delay queue
Order injection generates switches, queues, bindings, and simulations
@Configuration public class MyMQConfig { /** * Binding, Queue and Exchange in the container will be created automatically (without RabbitMQ) * RabbitMQ As long as there is@ Bean declaration properties will not be overwritten if they change * * @return */ /** * Generate order queue * * @return */ @Bean public Queue orderDelayQueue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); arguments.put("x-dead-letter-routing-key", "order.release.order"); arguments.put("x-message-ttl", 60000); //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; } /** * order.delay.queue Queue entered after expiration * * @return */ @Bean public Queue orderReleaseOrderQueue() { //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; } /** * Switch */ @Bean public Exchange orderEventExchange() { //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments return new TopicExchange("order-event-exchange", true, false); } /** * Switch and order delay. Queue binding */ @Bean public Binding orderCreateOrderBingding(){ //String destination, DestinationType destinationType, String exchange, String routingKey, // Map<String, Object> arguments return new Binding("order.delay.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null); } /** * Switch and order release. order. Queue binding */ @Bean public Binding orderReleaseOrderBingding(){ //String destination, DestinationType destinationType, String exchange, String routingKey, // Map<String, Object> arguments return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null); } }
@RabbitListener(queues = "order.release.order.queue") public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println("Overdue orders received:" + orderEntity.getOrderSn()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } @GetMapping("/test/createOrder") public String createOrder(){ OrderEntity entity = new OrderEntity(); entity.setOrderSn(UUID.randomUUID().toString()); entity.setModifyTime(new Date()); // Send message to mq rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity); return "ok"; }
Inventory auto unlock - queue
@Configuration public class MyRabbitConfig { /** * Use JSON serialization mechanism for message conversion */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } // /** // * Only after listening to the queue can the generation switch be triggered // * @return // */ // @RabbitListener(queues = "stock.release.stock.queue") // public void handle(Message message){ // // } /** * Switch * @return */ @Bean public Exchange stockEventExchange(){ //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments return new TopicExchange("stock-event-exchange",true,false); } /** * Queue entered after expiration * @return */ @Bean public Queue stockReleaseStockQueue(){ //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments return new Queue("stock.release.stock.queue",true,false,false); } /** * Whether the listening is expired * @return */ @Bean public Queue stockDelayQueue(){ /** * x-dead-letter-exchange: stock-event-exchange * x-dead-letter-routing-key: order.release.order * x-message-ttl: 60000 */ Map<String,Object> args = new HashMap<>(); args.put("x-dead-letter-exchange","stock-event-exchange"); args.put("x-dead-letter-routing-key","stock.release"); args.put("x-message-ttl",120000); return new Queue("stock.delay.queue",true,false,false,args); } /** * binding * @return */ @Bean public Binding stockReleaseBinding(){ /** * String destination, DestinationType destinationType, String exchange, String routingKey, * Map<String, Object> arguments */ return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null); } /** * binding * @return */ @Bean public Binding stockLockedBinding(){ /** * String destination, DestinationType destinationType, String exchange, String routingKey, * Map<String, Object> arguments */ return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null); } }
@Slf4j @Service @RabbitListener(queues = "stock.release.stock.queue") public class StockReleaseListener { @Autowired WareSkuService wareSkuService; /** * Auto unlock inventory * * @param stockLockedTo * @param message * @param channel */ @RabbitHandler public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException { System.out.println("Received message to unlock inventory..."); try { wareSkuService.unlockStock(stockLockedTo); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.info("zgc_eee" + e.getMessage()); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // e.printStackTrace(); } } }
/** * 1,Inventory is automatically unlocked. * The order is placed successfully, the inventory is locked successfully, and the next business call fails, resulting in order rollback. The previously locked inventory will be unlocked automatically. * 2,Order failed. * Lock inventory failed * <p> * As long as the message of unlocking inventory fails. Be sure to tell the service that unlocking failed and put it back in the queue. */ @Override public void unlockStock(StockLockedTo stockLockedTo) { Long id = stockLockedTo.getId(); StockDetailTo detail = stockLockedTo.getDetail(); Long detailId = detail.getId(); //Unlock WareOrderTaskDetailEntity wareOrderTaskDetailEntity = wareOrderTaskDetailService.getById(detailId); if (wareOrderTaskDetailEntity != null) { // Work order exists, unlock WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskService.getById(id); String orderSn = wareOrderTaskEntity.getOrderSn(); R r = orderFeignService.getOrderStatus(orderSn); if (r.getCode() == 0) { OrderVo orderVo = (OrderVo) r.getData(new TypeReference<OrderVo>() { }); if (orderVo == null || orderVo.getStatus().equals(OrderStatusEnum.CANCLED.getCode())) { // The order does not exist or has been cancelled // Unlock - the inventory work order status is locked if (wareOrderTaskDetailEntity.getLockStatus() == WareOrderConstant.WareOrderDetailStatusEnum.LOCKED.getCode()) { unlockStockFrom(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId); } } } else { // Query order status - Remote failed - need to be put into queue again throw new RuntimeException("Remote service failed"); } } else { // The work order does not exist. Release the queue information } } /** * Database - inventory unlock * * @param skuId * @param wareId * @param skuNum * @param detailId */ private void unlockStockFrom(Long skuId, Long wareId, Integer skuNum, Long detailId) { // Inventory unlock wareSkuDao.unlockStock(skuId, wareId, skuNum); // Update inventory work order status WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity(); wareOrderTaskDetailEntity.setId(detailId); wareOrderTaskDetailEntity.setLockStatus(WareOrderConstant.WareOrderDetailStatusEnum.UNLOCKED.getCode()); // Change to unlocked wareOrderTaskDetailService.updateById(wareOrderTaskDetailEntity); }
<!--sku Repository unlock--> <update id="unlockStock"> UPDATE `wms_ware_sku` SET stock_locked=stock_locked-#{skuNum} WHERE sku_id=#{skuId} AND ware_id=#{wareId} </update>
Shit, shit, remember to comment the listening queue of MyMQConfig
Automatically close orders and manually trigger inventory unlocking
/** * Order release is directly bound to inventory release * @return */ @Bean public Binding orderReleaseOtherBingding() { return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null); }
/** * Order expiration - trigger manual closing to unlock inventory * @param orderTo * @param message * @param channel * @throws IOException */ @RabbitHandler public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException { System.out.println("Order closed ready to unlock inventory..."); try{ wareSkuService.unlockStock(orderTo); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch (Exception e){ channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } }
// Prevent the order service from getting stuck, so that the order status message cannot be changed all the time, and the inventory message expires first. Check the order status, create a new status, and leave without doing anything. // Lead to Caton's order and never unlock inventory @Override public void unlockStock(OrderTo orderTo) { String orderSn = orderTo.getOrderSn(); WareOrderTaskEntity wareOrderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn); if (wareOrderTaskEntity != null){ Long id = wareOrderTaskEntity.getId(); List<WareOrderTaskDetailEntity> wareOrderTaskDetailEntities = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>() .eq("task_id", id) .eq("lock_status", WareOrderConstant.WareOrderDetailStatusEnum.LOCKED.getCode()) ); for (WareOrderTaskDetailEntity wareOrderTaskDetailEntity : wareOrderTaskDetailEntities) { // unlockStockFrom(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId); unlockStockFrom(wareOrderTaskDetailEntity.getSkuId(), wareOrderTaskDetailEntity.getWareId(), wareOrderTaskDetailEntity.getSkuNum(), wareOrderTaskDetailEntity.getId()); } } }
Ensure message reliability
Overall: (it can be made into a micro service, which is specially used to process mq)
Payment service (Alipay sandbox)
Asynchronous and synchronous callback addresses shall be filled in or not filled in. At the same time, ensure that the asynchronous callback address must be accessible from the Internet
Intranet penetration
Intranet and nginx and sandbox Alipay
/** * Alipay computer payment * @param orderSn * @return * @throws AlipayApiException */ @ResponseBody @GetMapping(value = "/payOrder",produces = "text/html") public String payOrder(@RequestParam("orderSn") String orderSn) throws AlipayApiException { PayVo payVo = orderService.getOrderPay(orderSn); String pay = alipayTemplate.pay(payVo); return pay; }
Asynchronous notification
Receipt
Time from order to payment > automatic inventory unlocking time
Automatic receipt
Manual receipt
Second kill service
Timed task
/** * 1,Spring It is composed of the sixth place, and the year of the seventh place is not allowed * 2,In the position of the day of the week, 1-7 represents Monday to Sunday; MON-SUN * 3,Scheduled tasks should not be blocked. The default is blocked * 1),Business operations can submit themselves to the thread pool in an asynchronous manner * CompletableFuture.runAsync(()->{ * xxxxService.hello(); * },executor); * 2),Support timed task thread pool; Set TaskSchedulingProperties; * spring.task.scheduling.pool.size=5 * * 3),Let scheduled tasks execute asynchronously * Asynchronous tasks; * * Solution: use asynchronous + scheduled tasks to complete the function of non blocking scheduled tasks; * * */ @Async @Scheduled(cron = "* * * ? * 2") public void testCron() throws InterruptedException { log.info("heloo-------------"); Thread.sleep(3000); }
# spring's own thread pool #spring.task.scheduling.pool.size=5 spring.task.execution.pool.core-size=5 spring.task.execution.pool.max-size=50
Regular second kill goods on the shelves
// @Scheduled(cron = "0 * * * * ?") // Once a minute, adjust it to 3 at night. @Scheduled(cron = "0 0 3 * * ?") // Online mode public void uploadSeckillSkuLatest3Days() { log.info("Commodity information that triggers the second kill on the shelf..."); // Distributed lock. // The business execution of the lock is completed, and the status has been updated. After releasing the lock. Others will get the latest status when they get it. RLock lock = redissonClient.getLock(upload_lock); lock.lock(10, TimeUnit.SECONDS); log.info("Product information of second kill on the shelf..."); try { seckillService.uploadSeckillSkuLatest3Days(); } catch (Exception e){ System.out.println(e.getMessage()); }finally { lock.unlock(); } }
private final String SESSIONS_CACHE_PREFIX = "seckill:sessions:"; private final String SKUKILL_CACHE_PREFIX = "seckill:skus"; private final String SKU_STOCK_SEMAPHORE = "seckill:stock:";//+Commodity random code @Override public void uploadSeckillSkuLatest3Days() { //1. Scan the activities that need to participate in second kill in the last three days R lates3DaySession = couponFeignService.getLates3DaySession(); if (lates3DaySession.getCode() == 0) { //Goods on the shelves List<SeckillSesssionsWithSkus> sessionData = (List<SeckillSesssionsWithSkus>) lates3DaySession.getData(new TypeReference<List<SeckillSesssionsWithSkus>>() { }); //Cache to redis //1. Cache activity information saveSessionInfos(sessionData); //2. Cache the associated product information of the activity saveSessionSkuInfos(sessionData); } } /** * Cache activity information * * @param sessionData */ private void saveSessionInfos(List<SeckillSesssionsWithSkus> sessionData) { if (sessionData != null) { sessionData.stream().filter(item -> { return item.getRelationSkus() != null && item.getRelationSkus().size() > 0; }).forEach(item -> { String key = SESSIONS_CACHE_PREFIX + item.getStartTime().getTime() + "_" + item.getEndTime().getTime(); Boolean hasKey = redisTemplate.hasKey(key); System.out.println("*************"); if (!hasKey) { List<String> collect = item.getRelationSkus().stream().map(relationSku -> { return relationSku.getPromotionSessionId() + "_" + relationSku.getSkuId(); }).collect(Collectors.toList()); System.out.println(collect.toString()); ; redisTemplate.opsForList().leftPushAll(key, collect); //TODO set expiration time [completed] redisTemplate.expireAt(key, new Date(item.getEndTime().getTime())); } }); } } /** * Cache the associated product information of the activity * * @param sessionData */ private void saveSessionSkuInfos(List<SeckillSesssionsWithSkus> sessionData) { if (sessionData != null) { sessionData.stream().filter(item -> { return item.getRelationSkus() != null && item.getRelationSkus().size() > 0; }).forEach(item -> { // Prepare hash operation BoundHashOperations<String, Object, Object> ops = redisTemplate.boundHashOps(SKUKILL_CACHE_PREFIX); item.getRelationSkus().stream().forEach(relationSku -> { String redisKey = relationSku.getPromotionSessionId().toString() + "_" + relationSku.getSkuId().toString(); if (!ops.hasKey(redisKey)) { SecKillSkuRedisTo redisTo = new SecKillSkuRedisTo(); // 1. Basic data of sku R r = productFeignService.info(relationSku.getSkuId()); if (r.getCode() == 0) { SkuInfoVo skuInfo = (SkuInfoVo) r.getData("skuInfo", new TypeReference<SkuInfoVo>() { }); redisTo.setSkuInfo(skuInfo); } // 2. sku's second kill information BeanUtils.copyProperties(relationSku, redisTo); //3. Set the second kill time information of the current product on the redisTo.setStartTime(item.getStartTime().getTime()); redisTo.setEndTime(item.getEndTime().getTime()); //4. Random code seckill? skuId=1&key=dadlajldj; String token = UUID.randomUUID().toString().replace("_", ""); redisTo.setRandomCode(token); // Add redis String jsonString = JSON.toJSONString(redisTo); ops.put(redisKey, jsonString); //5. Using inventory as distributed semaphore current limiting; RSemaphore semaphore = redissonClient.getSemaphore(SKU_STOCK_SEMAPHORE + token); semaphore.trySetPermits(relationSku.getSeckillCount()); // Number of second kills //TODO sets the expiration time. semaphore.expireAt(item.getEndTime()); } }); }); } }
Show second kill items - List
/** * Returns the information of second kill products that can participate in the current time * * @return */ @Override public List<SecKillSkuRedisTo> getCurrentSeckillSkus() { // 1. Determine which second kill session the current time belongs to Set<String> keys = redisTemplate.keys(SESSIONS_CACHE_PREFIX + "*");// redis keys xx * fuzzy query long time = new Date().getTime(); if (keys != null && keys.size() > 0) { for (String key : keys) { // list seckill:sessions:1623081600000_1623168000000 String replace = key.replace(SESSIONS_CACHE_PREFIX, ""); String[] s = replace.split("_"); long start = Long.parseLong(s[0]); long end = Long.parseLong(s[1]); if (time >= start && time <= end) { // 2. Get all the commodity information required for this second kill List<String> range = redisTemplate.opsForList().range(key, -100, 100);// Get the batch value of a key list BoundHashOperations<String, String, String> hashOps = redisTemplate.boundHashOps(SKUKILL_CACHE_PREFIX); List<String> list = hashOps.multiGet(range);// Batch acquisition if (list != null) { List<SecKillSkuRedisTo> collect = list.stream().map(item -> { SecKillSkuRedisTo redisTo = JSON.parseObject(item, SecKillSkuRedisTo.class); return redisTo; }).collect(Collectors.toList()); return collect; } break; } } } return null; }
Show second kill products - details
/** * Obtain the second kill activity product information according to skuId * * @param skuId * @return */ @Override public SecKillSkuRedisTo getSkuSeckillInfo(Long skuId) { // Find all the items that need to participate in the second kill BoundHashOperations<String, String, String> hashOps = redisTemplate.boundHashOps(SKUKILL_CACHE_PREFIX); Set<String> keys = hashOps.keys(); if (keys != null && keys.size() > 0) { String regx = "\\d_" + skuId; // Regular, I feel that regular matches are not enough for (String key : keys) { //6_4 if (Pattern.matches(regx, key)) { String json = hashOps.get(key); SecKillSkuRedisTo redisTo = JSON.parseObject(json, SecKillSkuRedisTo.class); if (redisTo == null) { return null; } // Random code processing long time = new Date().getTime(); if (time > redisTo.getStartTime() && time <= redisTo.getEndTime()) { // There seems to be nothing to do } else { // The current commodity has passed the second kill time. You want to delete it directly redisTo.setRandomCode(null); hashOps.delete(key); } return redisTo; } } } return null; }
Buy now - generate order
/** * Buy now - generate order * * @param killId sessionId_skuId * @param key Random code * @param num Purchase quantity * @return */ @Override public String kill(String killId, String key, Integer num) { long s1 = System.currentTimeMillis(); MemberRespVo memberRespVo = LoginUserInterceptor.loginUser.get(); // 1. Get the details of the current second kill product BoundHashOperations<String, String, String> hashOps = redisTemplate.boundHashOps(SKUKILL_CACHE_PREFIX); String json = hashOps.get(killId); if (!StringUtils.isEmpty(json)) { SecKillSkuRedisTo redisTo = JSON.parseObject(json, SecKillSkuRedisTo.class); // 2. Verify legitimacy Long startTime = redisTo.getStartTime(); Long endTime = redisTo.getEndTime(); long time = new Date().getTime(); Long ttl = endTime - time; // 2-1 validity of verification time if (time >= startTime && time <= endTime) { // 2-2 check random code and commodity id String randomCode = redisTo.getRandomCode(); String redisToSessionIdAndSkuId = redisTo.getPromotionSessionId() + "_" + redisTo.getSkuId(); if (randomCode.equals(key) && redisToSessionIdAndSkuId.equals(killId)) { // 2-3 verify whether the shopping quantity is reasonable if (num <= redisTo.getSeckillLimit()) { // The purchase quantity is less than the purchase limit per person // 2-4 verify whether this person has purchased. Idempotency; If the second kill is successful, take the place. userId_SessionId_skuId // If SETNX does not exist, a key is generated; if SETNX exists, false is generated String redisKey = memberRespVo.getId() + "_" + killId; // Expiration time required Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(redisKey, num.toString(), ttl, TimeUnit.MILLISECONDS); if (aBoolean) { // A successful occupation means that you have never bought it // 3. Release semaphore RSemaphore semaphore = redissonClient.getSemaphore(SKU_STOCK_SEMAPHORE + randomCode); boolean b = semaphore.tryAcquire(num);// No blocking // 4. The second kill is successful and orders quickly. Send MQ message if (b) { String timeId = IdWorker.getTimeId(); SeckillOrderTo orderTo = new SeckillOrderTo(); orderTo.setOrderSn(timeId); orderTo.setMemberId(memberRespVo.getId()); orderTo.setNum(num); orderTo.setPromotionSessionId(redisTo.getPromotionSessionId()); orderTo.setSkuId(redisTo.getSkuId()); orderTo.setSeckillPrice(redisTo.getSeckillPrice()); rabbitTemplate.convertAndSend("order-event-exchange", "order.seckill.order", orderTo); long s2 = System.currentTimeMillis(); log.info("time consuming...{}", (s2 - s1)); return timeId; } } } } } } return null; }
/** * Monitor seckill to generate order * @param seckillOrderTo * @param channel * @param message * @throws IOException */ @RabbitHandler public void listener(SeckillOrderTo seckillOrderTo, Channel channel, Message message) throws IOException { log.info("Prepare to create the details of the second kill order..." + seckillOrderTo.getOrderSn()); try { orderService.createSeckillOrder(seckillOrderTo); //Manual acquiring function channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.info("seckillOrder:" + e.getMessage()); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } }
/** * Second kill generate order * @param seckillOrderTo */ @Override public void createSeckillOrder(SeckillOrderTo seckillOrderTo) { //TODO save order information OrderEntity orderEntity = new OrderEntity(); orderEntity.setOrderSn(seckillOrderTo.getOrderSn()); orderEntity.setMemberId(seckillOrderTo.getMemberId()); orderEntity.setStatus(OrderStatusEnum.CREATE_NEW.getCode()); BigDecimal multiply = seckillOrderTo.getSeckillPrice().multiply(new BigDecimal("" + seckillOrderTo.getNum())); // Total amount: single room x quantity, no other preferential information orderEntity.setPayAmount(multiply); this.save(orderEntity); //TODO save order item information - temporarily omit sku specific information OrderItemEntity orderItemEntity = new OrderItemEntity(); orderItemEntity.setOrderSn(seckillOrderTo.getOrderSn()); orderItemEntity.setRealAmount(multiply); orderItemEntity.setSkuQuantity(seckillOrderTo.getNum()); orderItemService.save(orderItemEntity); // TODO sends information to the queue and cancels the order without purchasing }
sentinel fuse degradation
concept
https://github.com/alibaba/spring-cloud-alibaba/wiki/Sentinel
java -jar sentinel-dashboard-1.6.3.jar --server.port=8333
The application can only be displayed at an address to be applied, depending on
Real time monitoring Endpoint support (only displayed when the single machine threshold is exceeded)
# Sentinel endpoint support management.endpoints.web.exposure.include=* <!--sentinel-Endpoint support--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
Custom flow control response
@Configuration public class SeckillSentinelConfig { public SeckillSentinelConfig(){ WebCallbackManager.setUrlBlockHandler(new UrlBlockHandler(){ @Override public void blocked(HttpServletRequest request, HttpServletResponse response, BlockException ex) throws IOException { R error = R.error(BizCodeEnum.TOO_MANY_REQUEST.getCode(), BizCodeEnum.TOO_MANY_REQUEST.getMessage()); response.setCharacterEncoding("UTF-8"); response.setContentType("application/json"); response.getWriter().write(JSON.toJSONString(error)); } }); } }
feign fusing and manual degradation
Caller feign
Spring cloud starter openfeign has been introduced previously
- 1) . fuse protection of the caller: feign sentinel. enabled=true
- 2) . the caller manually specifies the degradation policy of the remote service. The remote service was degraded. Trigger our callback method
- 3) When browsing, you must sacrifice some remote services. Specify the degradation policy at the service provider (remote service);
The provider is running. However, instead of running its own business logic, the default degraded data (current limited data) is returned
@FeignClient(value = "gulimall-seckill",fallback = SeckillFeignServiceFallBack.class) public interface SeckillFeignService { @GetMapping("/sku/seckill/{skuId}") R getSkuSeckillInfo(@PathVariable("skuId") Long skuId); }
@Component public class SeckillFeignServiceFallBack implements SeckillFeignService { @Override public R getSkuSeckillInfo(Long skuId) { log.info("Fuse method call...getSkuSeckillInfo"); return R.error(BizCodeEnum.TOO_MANY_REQUEST.getCode(),BizCodeEnum.TOO_MANY_REQUEST.getMessage()); } }
Caller - Manual
Provider - Manual
Custom protected resources
* 5,Customize protected resources * 1),code * try(Entry entry = SphU.entry("seckillSkus")){ * //Business logic * } * catch(Execption e){} * * 2),Based on annotation. * @SentinelResource(value = "getCurrentSeckillSkusResource",blockHandler = "blockHandler")
Gateway flow control
<!--sentinel-Gateway flow control --> <!-- https://mvnrepository.com/artifact/com.alibaba.csp/sentinel-spring-cloud-gateway-adapter --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-sentinel-gateway</artifactId> <version>2.1.0.RELEASE</version> </dependency>
The gateway micro service and sentinel service need to be restarted
@Configuration public class SentinelGatewayConfig { //TODO responsive programming //GatewayCallbackManager public SentinelGatewayConfig(){ GatewayCallbackManager.setBlockHandler(new BlockRequestHandler(){ //If the gateway limits the flow of the request, the callback Mono Flux will be called @Override public Mono<ServerResponse> handleRequest(ServerWebExchange exchange, Throwable t) { R error = R.error(BizCodeEnum.TOO_MANY_REQUEST.getCode(), BizCodeEnum.TOO_MANY_REQUEST.getMessage()); String errJson = JSON.toJSONString(error); Mono<ServerResponse> body = ServerResponse.ok().body(Mono.just(errJson), String.class); return body; } }); } }
Sleuth+ Zipkin service link tracking
Integrate Sleuth
<!-- Link tracking--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency>
# Sleuth debug display logging.level.org.springframework.cloud.openfeign=debug logging.level.org.springframework.cloud.sleuth=debug
Integrate zipkin visual observation (the version changes too much)
// docker install zipkin server docker run -d -p 9411:9411 openzipkin/zipkin <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> </dependency>
# zipkin visual observation spring.zipkin.base-url=http://192.168.56.10:9411/ spring.zipkin.discovery-client-enabled=false spring.zipkin.sender.type=web spring.sleuth.sampler.probability=1
zipkin dependency also contains sleuth, and sleuth reference can be omitted
You need to click the RUN QUERY button to display the link