[應用]輕探Web Flux

作者: kentyeh (kent)   2022-04-21 22:46:53
網頁版在
https://kentyeh.blogspot.com/2022/04/webflux.html
===================================
本來標題是想寫輕探春日流轉的,想想還是算了。
程式碼 https://github.com/kentyeh/FluxWeb,您可以先git clone下來備用。
Spring 5 後來始導入non-blocking IO、reactive backpressure的Web開發方式;僅管
Spring官方稱WebFlux不會比Servlet快到哪去,但實際面臨到需要I/O的情況下,理論上
總是會快一點,像用reactor netty抓網頁的方式,我感覺就是比Apache HttpClient來的
快些。
轉到WebFlux首先要面臨的就是Servlet不再,沒了JSP,也沒了JSTL,一開始真的很難習
慣,忽然發現一堆Listener沒得用,也沒辦法 Wrap Servlet Request,但為了或許能快
那麼一點點,總是得付出些代價。
在學習的過程式,覺得困難點大概有三,分別是Web轉換、Secuity應用與WebSocket管控
,我想就這幾點來說明如果克服(至於如何寫Reactive,不想在這裡多說,
http://projectreactor.io 可以瞭解一下,網路也有一堆教學文件)。
首先要面臨的是Web撰寫方式的轉換:
Spring boot 提供了一堆 spring-boot-starter-xxxx,可以很方便的開始一個專案,優
點是快速,缺點是引用了一堆可能用不到的Libraries,我並不打算以此為進入點。
WebFlux在少了Container的情況下,註定以應用程式的方式存在,而應用程式的方式就是
採用ApplicationContext去載入一些程式設定
package wf;
public class Main {
public static void main(String[] args) throws IOException {
try (AbstractApplicationContext context = new
AnnotationConfigApplicationContext(wf.config.AppConfig.class)) {
context.registerShutdownHook();
context.getBean(DisposableServer.class).onDispose().block();
}
....
所以AppConfig.java就是設定的進入點,上述程式載入設定後,隨即就是啟動HttpServer

package wf.config;
@Configuration
@ImportResource("classpath:applicationContext.xml")
@Import({WebConfig.class, PostgresR2dbConfig.class, H2R2dbConfig.class,
SecConfig.class, WsConfig.class})
public class AppConfig {
...
@Configuration不用多說,寫過Spring程式的人都應該知道 。
至於@ImportResource,嗯!我是念舊的人,習慣把設定放在XML內(從Ver 3開始養成的)
,applicationContext.xml包含了Component Scan 與 thymeleaf(取代JSP)的一些設定。
AppConfig.java依序載入了Web設定、資料庫設定、安全性設定與WebSocket設定。
WebConfig.java包含了WebFlux運作的基礎設定:
前面說了,沒了Servlet,WebFlux就必須找一些替代品,首先面臨的就是Session的問題
,Spring Session提供了多種選擇,我想為了效能,您應該不會選用jDBC的選項,以前我
用過Hazelcast,好處是去中心化(不用多備一台主機,直接把函式庫綁入程式內),只要
還有一台Web存活(指的是Cluster架構),資料就不會丟失,但缺點也是去中心化,想要操
縱資料,除了自已寫管理程式加入其中,不然就得花錢錢找官方,所以這次採用了大多數
人會用的Redis,好處是有Cli界面可用,缺點是要多備一台機器,一旦機器掛點,程式就
全掛了。
package wf.config;
@Configuration
@Import(RedisCacheConfig.class)
@EnableWebFlux
@EnableRedisWebSession(maxInactiveIntervalInSeconds = 30 * 60)
public class WebConfig implements WebFluxConfigurer, DisposableBean,
ApplicationListener<ContextClosedEvent> {
...
設定檔必須繼承WebFluxConfigurer並標註@EnableWebFlux是基本要件,
@EnableRedisWebSession則是說明以Redis做為Session資料戴體,理所當然,Redis可以
儲存Session資料,當然也可做為Cache所用,所以在此我
Import(RedisCacheConfig.class),並將連線Redis的程式放在RedisCacheConfig內。
WebConfig.java的重要責任就是建構httpServer()(也是Main.java程式啟動時主要載入的
標的),為了要在程式結束時優雅的結束httpServer,所以WebConfig也實做了
DisposableBean與ApplicationListener<ContextClosedEvent>,為了就在是程式終止時
順便關閉httpServer;
另外addResourceHandlers(...)是為了載入靜態目錄,當url指到了static時,首先從
Webjars先找(像我引用了JQuery與purecss的jar),找不到再找classpath裡的static目錄

@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/static/**")
.addResourceLocations("classpath:/META-INF/resources/webjars/")
.addResourceLocations("classpath:/static/")
.resourceChain(false);
}
而localeContextResolver()則是指定使用url參數locale來變更語系(沒有多國語系,就
不用建構LocaleContextResolver)。
@Bean
public LocaleContextResolver localeContextResolver() {
return new LocaleContextResolver() {
...
至此再加寫一支帶有@Controller標記的程式(如wf.spring.RootController),就可以執
行Main.java讓WebFlux跑起來了。
在進入到下個主題之前,我必須提及spring.profiles.active這個系統屬性,Spring用這
個屬性來控制Profile,所以我決定當這個系統屬性為dev時,表示整個系統屬於開發模式
,否則就是正式環境模式,所以您可能注意到AppConfig.java同時載入了
PostgresR2dbConfig.class(正式環境)與H2R2dbConfig.class(開發環境)。
package wf.config;
@Configuration
@Profile("dev")
public class H2R2dbConfig extends R2dbConfig implements
ApplicationListener<ContextClosedEvent> {
...
為此我在POM.xml設定的對應的兩個Profile,以便在開發模式下,可以引用不同的函式庫
並執行一些初始作業(如建構資料庫與啟動一個Redis Mock Server)。
<profiles>
<profile>
<id>dev</id>
<properties>
<spring.profiles.active>dev</spring.profiles.active>
<http.port>8080</http.port>
<spring.freemarker.checkTemplateLocation>
false
</spring.freemarker.checkTemplateLocation>
</properties>
<dependencies>
...
</dependencies>
</profile>
<profile>
<id>prod</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spring.profiles.active>prod</spring.profiles.active>
<http.port>80</http.port>
</properties>
在開發時期,我只要執行
mvn -Pdev compile exec:java
就可以以測試環境的方式來執行程式。
當然,除了@Profile外,還有其它選擇,在RedisCacheConfig.java裡面有通往Redis
Server的連線設定
package wf.config;
Configuration
@EnableCaching(mode = AdviceMode.PROXY)
public class RedisCacheConfig extends CachingConfigurerSupport {
@Bean("prod")
@Conditional(LettuceConnFactoryCondition.class)
public LettuceConnectionFactory
redisProdConnectionFactory(GenericObjectPoolConfig gopc) {
logger.info("採用正式環境:連到正式Redis主機");
RedisSocketConfiguration config = new
RedisSocketConfiguration("/tmp/redis.sock");
...
LettucePoolingClientConfiguration poolConfig =
LettucePoolingClientConfiguration.builder()
.poolConfig(gopc).build();
return new LettuceConnectionFactory(config, poolConfig);
}
@Bean("dev")
@Conditional(LettuceConnFactoryCondition.class)
public LettuceConnectionFactory
redisDevConnectionFactory(GenericObjectPoolConfig gopc) {
logger.info("採用測試環境:連到測試Redis Mock");
RedisStandaloneConfiguration config = new
RedisStandaloneConfiguration("localhost",
context.getBean(RedisServer.class).getBindPort());
LettucePoolingClientConfiguration poolConfig =
LettucePoolingClientConfiguration.builder()
.poolConfig(gopc).build();
return new LettuceConnectionFactory(config, poolConfig);
}
然後透過LettuceConnFactoryCondition.java來決定哪個Bean應該被建立
package wf.util;
public class LettuceConnFactoryCondition implements Condition {
@Override
public boolean matches(ConditionContext context,
AnnotatedTypeMetadata metadata) {
String profile =
context.getEnvironment()
.getProperty("spring.profiles.active", "prod");
Map<String, Object> attributes =
metadata.getAnnotationAttributes(
Bean.class.getName());
String beanName = attributes == null ? ""
: ((String[]) attributes.get("value"))[0];
return beanName.startsWith(profile);
}
}
只要BeanName與開頭與系統屬性spring.profiles.active(預設為"prod")一致時則建立,
所以得以依這個屬性來決定要如何連結到Redis Server。
另外我想要提一下的是Ractive JDBC,目前有R2DBC(https://r2dbc.io)可用,當然
Spring也有對應的專案(https://spring.io/projects/spring-data-r2dbc),現下只支援
幾個主流的資料庫,而且大都不是官方開發的,最大的困擾還是在於沒有JNDI可用,讓我
沒法利用像Atomikos這種工具來作Two Phase Commit,失去了跨資料庫的機會,當然也可
換種想法,要快就不要跨資料庫Commit。
為了介紹後續的功能,我必須先說明一下我資料庫的Schema:只有包含兩個Table,一個
是成員,另一個是成員的角色。
CREATE TABLE IF NOT EXISTS member(
account varchar(10) primary key,
username varchar(16) not null,
passwd varchar(20) not null,
enabled varchar(1) default 'Y' check(enabled='Y' or enabled='N'),
birthday date default CURRENT_DATE
);
CREATE TABLE IF NOT EXISTS authorities(
aid SERIAL primary key,
account varchar(10) references member(account) on update cascade on delete
cascade,
authority varchar(50) not null
);
create unique index IF NOT EXISTS authorities_idx on
authorities(account,authority);
主要對應的類別是wf.model.Member,特別需要關注的是isNew()這個Method,Spring
Data主要透過這個方法來決定資料是要Insert還是Update;其它相關的物件有
wf.data.MemberDao(負責資料庫對應的查詢或更新)與wf.data.MemberManager(負責交易
的控制)。
說到這,我不得不提Spring的DataBinder,
@ControllerAdvice
public class ControlBinder {
/*private MemberManager manager;
@Autowired
public void setManager(MemberManager manager) {
this.manager = manager;
}*/
@InitBinder
public void initBinder(WebDataBinder binder) {
binder.registerCustomEditor(Date.class, new DatePropertyEditor());
binder.registerCustomEditor(LocalDate.class,
new LocalDatePropertyEditor());
binder.registerCustomEditor(Boolean.class,
new BooleanPropertyEditor());
//binder.registerCustomEditor(Member.Mono.class, manager);
}
}
上面註冊了一些物件,來做為資料在String與Object之間的轉換,可以看到這些類別都實
做了java.beans.PropertyEditor,(MemberManager也不例外,沒有註冊的原因是因為我
實做並在WebConfig.java註冊了另一個物件MemberFormatter),這種轉換有什麼用呢?且
看下面例子:
package wf.spring;
@Controller
public class RootController {
@GetMapping("/hello/{member}")
public String hello(@PathVariable("member") Member.Mono member,
Model model) {
model.addAttribute("user", member.get().switchIfEmpty(Mono.empty()));
return "hello";
}
}
只要在網址列打上Member的帳號,在叫用Method前,會先將PathVariable透過轉換器轉換
成對應的物件。
另一個常用的功能則是用@ModelAttribute來蒐集前端輸入
https://i.imgur.com/7eFAzHr.png
@Controller
public class RootController {
@PreAuthorize("hasRole('ADMIN')")
@PostMapping("/modifyMember/{member}")
public Mono<String> modifyMember(
@PathVariable("member") Member.Mono oriMember,
@ModelAttribute Member member, Model model) {
//避免後面有值但第一個沒勾,導致null字串
Iterables.removeIf(member.getRoles(), Predicates.isNull());
model.addAttribute("member", memberManager
.saveMember(member.setNew(false)));
return Mono.just("member");
}
}
前端輸入的資料,毌論是Master主體資料與Detail角色資料一併被蒐集並轉成member物件
(生日也因為註冊過LocalDatePropertyEditor也同樣被轉成LocalDate)。
在進入Security之前要提一下projectreactor.io的reactor.util.Logger,常常在Mono或
Flux加入log()方法來記錄除錯過程,其實它是叫用log(Logger),可惜這個Logger,其底
層是採用SLF4J所實作的非同步Log,但您可以注意到,我採用的是Apache Log4j2,雖然
Log4j2,有asyncLogger,但若Mono或Flux沒有對應的Logger可用,有點遺憾,所以我實
做了一個Loggers4j2,可以替代原本的Logger來對Mono或Flux除錯。Log4j2的
asyncLogger本質上是不希望記錄Log發生在何處,因為找出記錄發生在何處會使得效能大
大降低,所以稟持相同理念,您應該在Log時,讓訊息本身彰顯足以判斷出處,當然在開
發模式下,我還是會找出訊息記錄的發生處。
Spring Security的設定如下,可惜沒有辦法用XML進行設定
package wf.config;
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
public class SecConfig {
...
@EnableWebFluxSecurity是說明採用Spring Security,@EnableReactiveMethodSecurity
則說明會採用Method級別的安全設定,
public class SecConfig {
@Bean
public SecurityWebFilterChain securitygWebFilterChain(
ServerHttpSecurity http) {
SecurityWebFilterChain build = http
.authorizeExchange()
.pathMatchers("/", "/index", "/hello/**", "/static/**",
"/login", "/logout").permitAll()
...
.formLogin((ServerHttpSecurity.FormLoginSpec flt) -> {
flt.authenticationManager(new
CaptchaUserDetailsReactiveAuthenticationManager(
userDetailsService()));
flt.loginPage("/login");
flt.authenticationFailureHandler(new
RedirectFluxWebAuthenticationFailureHandler(
"/login?error"));
})
...
.and().build();
build.getWebFilters().subscribe(filter -> {
if (filter instanceof AuthenticationWebFilter) {
AuthenticationWebFilter awf =
(AuthenticationWebFilter) filter;
awf.setServerAuthenticationConverter(new
CustomServerFormLoginAuthenticationConverter());
}
});
return build;
}
SecurityWebFilterChain(http)是主要的設定主體,可以看出我想自訂登錄畫面(主要是
加入Captcha),因為Spring Security使用UsernamePasswordAuthenticationToken來存放
用戶的帳號/密碼,所以以wf.security.UsernamePasswordCaptchaAuthenticationToken
來對應存放資訊,也因為多了Captcha,所以必須自行進行授權檢查,所以在formLogin裡
指定了自訂的wf.model.CaptchaUserDetailsReactiveAuthenticationManager,但問題來
了,Webflux把蒐集token的過程隱藏起來以致於沒辦法讓包含Captcha的token被轉送給
AuthenticationManager來處理,所以我也只能用過濾Filters的方式,把
wf.security.CustomServerFormLoginAuthenticationConverter替換給
AuthenticationWebFilter。
https://i.imgur.com/ASFzbtc.png
這裡要備註一點小提醒:Capatch驗證一旦取用,必須立即清除,否則機器人只要取用一
次,就可以無限次try帳/密就失去了Captch的意義了。
理論上加進了Security,那麼我們就能在Request Method裡面加上
@AuthenticationPrincipal來取的User Principal
public class RootController {
@GetMapping("/whoami")
public String whomai(@AuthenticationPrincipal
Mono<UserDetails> principal, Model model) {
model.addAttribute("user", principal);
return "index";
}
所以寫下了測試程式:
package wf.spring;
@WebFluxTest(controllers = RootController.class,
excludeAutoConfiguration = {ReactiveSecurityAutoConfiguration.class})
@TestExecutionListeners({ReactorContextTestExecutionListener.class
,WithSecurityContextTestExecutionListener.class})
@ContextConfiguration(classes = {TestContext.class})
public class TestRootController extends AbstractTestNGSpringContextTests {
@Test
@WithMockUser(username = "nobody", password = "nobody",
authorities = "ROLE_USER")
void testWhoAmi() {
Member member = new Member("nobody", "嘸人識君");
member.setPasswd("nobody");
member.addRole("ROLE_USER");
webClient
.mutateWith(mockUser(new MemberDetails(member)))
//.mutateWith(mockUser("嘸人識君").roles("USER"))
.get().uri("/whoami").header(HttpHeaders.CONTENT_TYPE,
MediaType.TEXT_HTML_VALUE)
.exchange().expectBody().consumeWith(response
-> Assertions.assertThat(new
String(response.getResponseBody()
, StandardCharsets.UTF_8)).contains("嘸人識君"));
}
發現@WithMockUser完全沒用,我猜是不是WithMockUserSecurityContextFactory裡的
createEmptyContext()的關係,所以不得不改用上述程式碼裡的
mutateWith(mockUser(…))方式來mock User。
這裡第一隻測試程式TestRootController必須先說明一下:
沒錯,TestCase是繼承AbstractTestNGSpringContextTests,為什麼是TestNG?那是因為
我第一次寫單元測試,JUnit沒有多執行緒測試,所以只能改用TestNG,另外的一個原因
則是TestNG產生的報表比較美觀。也不知道是不是因為TestNG的關係,導致一些行為不如
我的預期。
Spring的測試一般都會排除Security設定,讓測試的行為儘量單純,所以上述測試,
WebFluxTest首先要排除Reactive Security的自動設定,而@TestExecutionListeners用
來處理事前準備作業(其中的WithSecurityContextTestExecutionListener可以從所有測
試程式中移除,雖然我照著官方說明(https://bit.ly/3xIRhK5)來作,但完全看不出有
什麼用)。
然後實測結果,Principal還無無法傳播到 whoami(),所以我猜應該是某某不知名的原因
,導致測試環境沒有建立HandlerMethodArgumentResolver,所以我從boot抄來
wf.util.ConditionalOnMissingBean與wf.util.MissingBeanCondition,並在
package wf.config;
public class TestContext implements WebTestClientConfigurer {
@Bean("testAuthenticationPrincipalResolver")
@ConditionalOnMissingBean(AuthenticationPrincipalArgumentResolver.class)
public HandlerMethodArgumentResolver
authenticationPrincipalArgumentResolver(BeanFactory beanFactory) {
return new TestAuthenticationPrincipalResolver(beanFactory);
}
當環境缺少AuthenticationPrincipalArgumentResolver時,自動建立一個
wf.config.TestAuthenticationPrincipalResolver(也是抄來的),自此測試才算圓滿成
功。
這裡也要特別提醒,每一支AbstractTestNGSpringContextTests都運行在一個獨立的
Context中(多隻Tests運行時,會看到Spring Boot的LOGO跑出來多次)。
相信很多人對WebSocket的第一印象就是那個著名的Chat聊天程式,Client端發起一個
WebSocket連線到Server,Server則記著所有連線,只要接收到Client End傳來的訊息,
立即把該訊息逐一傳送給其它所有連線。
其實細究Client到Server建立連線有一個過程,一開始Client是透過URL連線到Server,
完成HandShake後才建立一個雙向連線(雙方都可發訊息給另一方),直到有一方中斷連線

所以Securiy的應用,第一步就是開始的那個URL連線,SpringSecurity管控URL是天經地
義;當WebSocket連線建立後,即使用戶登出,只要雙方沒有一方切斷連線,其實這個
WebSocket並不會受到影響,畢竟兩者處在不同世界,所以Server必須記著這個
WebSocket 連線,當用戶登出後,立即由Server端切斷WebSeocket連線。
記得前面說過,Spring Session可以用來建造Web Cluster嗎?因為Session是存在獨立的
Redis Server,所以Client端連線進來,並不在意Cookie會被送到叢集中的哪一台。她們
是等價的;但是WebSocket連線是一個持續的連線,一旦建立,Client便會和最後
HandShake的這台WebServer建立一條持久穩固的連線。也就是說:同一瀏覽器可能開啟多
個視窗連線到N台WebServer以建立WebSocket。
假設一用戶(Nobody)用兩個裝置的瀏覽器,先後登錄到WebServer並各自開啟兩個頁面(假
設是聊天室,可能連到不同的兩臺WebServer)並建立WebSocket。所以我們先確立幾件事
※四個WebSocket連線共登錄了兩次,所以有兩個不同的Session Id
※任何給Nobody的訊息,都應該送達這4個頁面
※其中一個瀏覽器進行登出,只會影響同瀏覽器的頁面,另一個裝置的兩個
WebSocket連線仍然持續運作
首先設定以/ws做為進入點,這個進入點在之前的安全設定必須為登錄過用戶使用,URL會
取得靜態網頁chat.html,同時這也是WebSocket HandShake的point.
public class RootController {
@GetMapping("/ws")
public String chat() {
return "chat";
}
然後在WsConfig.java指定WebSocket HandShake所在
package wf.config;
@Configuration
@Import(RedisCacheConfig.class)
public class WsConfig {
@Bean
public HandlerMapping handlerMapping(WebSocketHandler webSocketHandler) {
String path = "/chat";
Map<String, WebSocketHandler> map = new HashMap<>();
map.put(path, webSocketHandler);
return new SimpleUrlHandlerMapping(map, -1);
}
@Bean
public HandlerAdapter wsHandlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
return new wf.spring.HandshakeFluxWebSocketService();
}
介入HandShake過程(靠自訂HandshakeFluxWebSocketService)
WebSocket只能在HandShake時取得Session相關資訊,這也是為什麼需要介入HandShake
Service的原因,我們在HandShake的同時,將額外的資料設定給WebSocket
package wf.spring;
public class HandshakeFluxWebSocketService extends HandshakeWebSocketService
implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
setSessionAttributePredicate(s -> {
logger.info("轉遞 ({}) 給 WebSoketHandler", s);
return true;
});
}
@Override
public Mono<Void> handleRequest(ServerWebExchange exchange,
WebSocketHandler handler) {
exchange.getSession().subscribe(ws -> {
SecurityContext sc = ws.getAttribute(
HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY);
if (sc != null) {
logger.info("HandshakeFluxWebSocketService-principal is [{}]{}"
, sc.getAuthentication().getPrincipal().getClass().getName()
, sc.getAuthentication().getPrincipal());}
ws.getAttributes().put("JSESSIONID", ws.getId());
});
return super.handleRequest(exchange, handler);
}
基本上,HandleShake會想要把Session裡面所有的東西設定給WebSocket,但會先問一下
,基本上我是一律放行,所以setSessionAttributePredicate()都是回傳true;
之前也說過,瀏覽器登出時,要把相關的WebSocket全數關閉,所以我需要知道交互的對
象的Session ID,基本上Spring Session,所以在handleRequest(), 我取出後,直接放
到WebSocket的Attributes(Key值是JSESSIONID),您可以從執行的Log中看出端倪。
每個WebSocket連線後,SocketHandler都要建立一個WebSocketRedisListener物件(本身
會記住是屬於哪個JSESSIONID),物件的責任內容如下(this的部份不是很正確,勿怪):
https://i.imgur.com/uyF09YL.png
wf.spring.FluxWebSocketHandler裡面有一個全域物件存放所有的
WebSocketRedisListener,
public static final ListMultimap<String,websocketredislistener>
userListenser = MultimapBuilder...
WebSocketHandler一開始就是要建立一個WebSocketRedisListener,即使本身收到
Client End傳來的訊息,也要交由這個WebSocketRedisListener去廣播給所有WebSocket
連線。
@Component("serverLogoutSuccessHandler")
public class FluxWebSocketHandler implements WebSocketHandler,
ServerLogoutSuccessHandler {
public Mono<Void> handle(WebSocketSession session) {
Object jsessionId = session.getAttributes().get("JSESSIONID");
Object sectximp = session.getAttributes().get(
HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY);
if (sectximp != null && jsessionId != null &&
!jsessionId.toString().trim().isEmpty()) {
SecurityContext sectx = (SecurityContextImpl) sectximp;
Authentication auth = sectx.getAuthentication();
User user = auth == null ? null : ((User) auth.getPrincipal());
if (user != null) {
ReactiveRedisMessageListenerContainer container =
context.getBean(
ReactiveRedisMessageListenerContainer.class);
ReactiveRedisTemplate<String, JsonNode> redisTemplate =
new ReactiveRedisTemplate<>(connectionFactory,
serializationContext);
UnaryOperator<JsonNode> processor = notify -> notify;
WebSocketRedisListener<JsonNode> wsrl =
context.getBean(WebSocketRedisListener.class
,session, container, user.getUsername(), jsessionId
,serializationContext, redisTemplate, processor);
logger.info("put listener[{}] {}", jsessionId,
userListenser.put(jsessionId.toString().trim(), wsrl));
return session.receive().flatMap(webSocketMessage -> {
String payload = webSocketMessage.getPayloadAsText();
logger.debug("收到:{}", payload);
//convert payload to JsonNode
JsonNode notify = serializationContext
.getValueSerializationPair()
.read(ByteBuffer.wrap(payload.getBytes(
StandardCharsets.UTF_8)));
wsrl.getReactiveRedisTemplate().convertAndSend(
user.getUsername(), notify).subscribe();
return Mono.empty();
}).doOnTerminate(() -> {
if (userListenser.get(
jsessionId.toString()).remove(wsrl)) {
wsrl.destroy();
logger.info("移除監聽器");
} else {
logger.error("移除監聽器失敗");
}
}).doFinally(signal -> {
...
這個Handler也同時實做了ServerLogoutSuccessHandler,為的就是在用戶登出時,從
userListener清除同JSESSIONID的WebSocketRedisListener。
@Override
public Mono<Void> onLogoutSuccess(WebFilterExchange exchange,
Authentication authentication) {
String username = authentication.getPrincipal() == null
? exchange.getExchange().getPrincipal()
.map(p -> p.getName()).block()
: User.class.isAssignableFrom(
authentication.getPrincipal().getClass())
? ((User) authentication.getPrincipal()).getUsername()
: Principal.class.isAssignableFrom(
authentication.getPrincipal().getClass())
? ((Principal) authentication.getPrincipal()).getName()
: null;
logger.info(":{}登出", username);
exchange.getExchange().getSession().subscribe(ws -> {
logger.info("JSESSIONID:{}", ws.getId());
userListenser.removeAll(ws.getId()).forEach(wrl -> wrl.destroy());
}, t -> logger.error("登出排除WS時錯誤:" + t.getMessage(), t)
);
ServerHttpResponse response = exchange.getExchange().getResponse();
response.setStatusCode(HttpStatus.FOUND);
response.getCookies().remove("JSESSIONID");
response.getHeaders().setLocation(logoutSuccessUrl);
return exchange.getExchange().getSession()
.flatMap(WebSession::invalidate);
}
至此,完成我對WebSocket的期待,我心目中的購物車,就是當商品被放入購物車的時後
,資料打包丟給JMS去逐一處理(我用這種方式應付過9.8K-Google Analytics顯示人潮同
時開搶,可惜那時還不會應用WebSocket),後端處理完成後再透過WebSocket把購物車的
變動通知前端。
chat.html裡面有個放入購物車的按紐,就是透過Ajax通知後端放入購物車,然後把訊息
Publish給Redis,Listener收到後再透過WebSocket通知商品已放入購物車。
https://i.imgur.com/B1h1XFW.png
當您同時用不同裝置開啟蝦皮時,只要一個裝置放入商品,其它裝置的購物車也會同步更
新購物車就是我想要的效果。
再試試直接從Redis命令列直接發佈訊息
https://i.imgur.com/ewxrg7H.png
最後要考慮的是如何測試?前述的加入購物車,是個多步驟的過程,首先,需要有Redis的
環境,然後登錄,建立WebSocket連線,呼叫放入購物車,檢查回傳的訊息。近乎真實世
界的測試,此時已不能視為"單元"測試,而是應該視為"整合測試"。因為近乎真實,所以
會有CSRF、會有Captcha,為了測試的緣故,必須將CSRF與Catpch固定下來,所以Captcha
在有系統屬性"captcha"時,就會以此值做為預設值。也因為為了固定CSRF的值,所以測
試不會引入原本的wf.config.SecConfig,而是引入改寫過的TestSecConfig.java。
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
public class TestSecConfig extends SecConfig {
SecurityWebFilterChain build = http
.authorizeExchange()
.pathMatchers("/", "/index", "/hello/**", "/static/**",
"/login", "/logout").permitAll()
.pathMatchers("/admin/**").hasAuthority("ROLE_ADMIN")
.anyExchange().authenticated()
.and().csrf(c -> c.csrfTokenRepository(
new FixedCsrfTokenRepository(csrf)))
...
在測試WebSocket,我用了兩種方式,分別是透過Reactor Netty的方式與HtmlUnit的方式
,ReactorNetty的方式比較低階(此時我比較想用這個,畢竟是在寫Reactive程式),必須
自行取Captcha圖檔,記住前次Cookies,發送FormData,HtmlUnit則比較高階,整個就是
個Headless Browser,除了看不到畫面,與一般的瀏覽器並無不同,但兩者都有同樣的問
題困擾我:就是無法真正掌握WebSocket建立連接的時刻(以前端的角度來看,就是無法掌
握WebSocket.onOpen的時機),逼得我沒辦法,只能以Thread.sleep(3000)應對,也許有
大神能夠開示一下。
在我學習WebFlux的過程,總感覺官方文件不是寫得很詳細,所以才想要記錄一下歷程,
也希望對後來者有點幫助。

Links booklink

Contact Us: admin [ a t ] ucptt.com