public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
inheritableThreadLocal.set("i am a inherit parent");
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(inheritableThreadLocal.get());
inheritableThreadLocal.set("i am a old inherit parent");// 子線程中設置新的值
}
});
TimeUnit.SECONDS.sleep(1);
inheritableThreadLocal.set("i am a new inherit parent");// 主線程設置新的值
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(inheritableThreadLocal.get());
}
});
}
i am a inherit parent
i am a old inherit parent
這里看第一個執(zhí)行結果,,發(fā)現(xiàn)主線程第二次設置的值,,沒有改掉,,還是第一次設置的值“i am a inherit parent”,這是什么原因呢,?
再看第二個例子的執(zhí)行結果,發(fā)現(xiàn)在第一個任務中設置的“i am a old inherit parent"的值,,在第二個任務中打印出來了,。這又是什么原因呢?
回過頭來看看上面的源碼,,在線程池的情況下,,第一次創(chuàng)建線程的時候會從父線程中copy inheritableThreadLocals中的數(shù)據(jù),所以第一個任務成功拿到了父線程設置的”i am a inherit parent“,,第二個任務執(zhí)行的時候復用了第一個任務的線程,,并不會觸發(fā)復制站長博客父線程中的inheritableThreadLocals操作,所以即使在主線程中設置了新的值,,也會不生效,。同時get()方法是直接操作inheritableThreadLocals這個變量的,所以就直接拿到了第一個任務設置的值,。
那遇到線程池應該怎么辦呢,?
2.3 TransmittableThreadLocal
TransmittableThreadLocal(TTL)這個時候就派上用場了。這是阿里開源的一個組件,,我們來看看它怎么解決線程池的問題,,先來一段代碼,在上面的基礎上修改一下,,使用TransmittableThreadLocal,。
static TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<>();// 使用TransmittableThreadLocal
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService = TtlExecutors.getTtlExecutorService(executorService); // 用TtlExecutors裝飾線程池
transmittableThreadLocal.set("i am a transmittable parent");
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(transmittableThreadLocal.get());
transmittableThreadLocal.set("i am a old transmittable parent");// 子線程設置新的值
}
});
System.out.println(transmittableThreadLocal.get());
TimeUnit.SECONDS.sleep(1);
transmittableThreadLocal.set("i am a new transmittable parent");// 主線程設置新的值
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(transmittableThreadLocal.get());
}
});
}
i am a transmittable parent
i am a transmittable parent
i am a new transmittable parent
執(zhí)行代碼后發(fā)現(xiàn),使用TransmittableThreadLocalTtlExecutors.getTtlExecutorService(executorService)裝飾線程池之后,,在每次調用任務的時,,都會將當前的主線程的TransmittableThreadLocal數(shù)據(jù)copy到子線程里面,執(zhí)行完成后,,再清除掉,。同時子線程里面的修改回到主線程時其實并沒有生效。這樣可以保證每次任務執(zhí)行的時候都是互不干涉的,。這是怎么做到的呢,?來看源碼。
private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
com.alibaba.ttl.TtlRunnable#run
/**
* wrap method {@link Runnable#run()}.
*/
@Override
public void run() {
Object captured = capturedRef.get();// 獲取線程的ThreadLocalMap
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
Object backup = replay(captured);// 暫存當前子線程的ThreadLocalMap到backup
try {
runnable.run();
} finally {
restore(backup);// 恢復線程執(zhí)行時被改版的Threadlocal對應的值
}
}
com.alibaba.ttl.TransmittableThreadLocal.Transmitter#replay
/**
* Replay the captured {@link TransmittableThreadLocal} values from {@link #capture()},
* and return the backup {@link TransmittableThreadLocal} values in current thread before replay.
*
* @param captured captured {@link TransmittableThreadLocal} values from other thread from {@link #capture()}
* @return the backup {@link TransmittableThreadLocal} values before replay
* @see #capture()
* @since 2.3.0
*/
public static Object replay(Object captured) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// backup
backup.put(threadLocal, threadLocal.get());
// clear the TTL value only in captured
// avoid extra TTL value in captured, when run task.
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// set value to captured TTL
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : capturedMap.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue());
}
// call beforeExecute callback
doExecuteCallback(true);
return backup;
}
com.alibaba.ttl.TransmittableThreadLocal.Transmitter#restore
/**
* Restore the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)}.
*
* @param backup the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)}
* @since 2.3.0
*/
public static void restore(Object backup) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
// call afterExecute callback
doExecuteCallback(false);
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// clear the TTL value only in backup
// avoid the extra value of backup after restore
if (!backupMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// restore TTL value
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : backupMap.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue());
}
}
OK,,既然問題都解決了,,來看看實際使用吧,有兩種使用,先看第一種,,涉及HTTP請求,、Dubbo請求和 job,采用的是數(shù)據(jù)級別的隔離,。
三,、 TTL 在海外商城的實際應用
3.1 不分庫,分數(shù)據(jù)行 + SpringMVC
用戶 HTTP 請求,,首先我們要從url或者cookie中解析出國家編號,,然后在TransmittableThreadLocal中存放國家信息,在 MyBatis 的攔截器中讀取國家數(shù)據(jù),,進行sql改造,,最終操作指定的國家數(shù)據(jù),多線程場景下用TtlExecutors包裝原有自定義線程池,,保障在使用線程池的時候能夠正確將國家信息傳遞下去,。
public class ShopShardingHelperUtil {
private static TransmittableThreadLocal<String> countrySet = new TransmittableThreadLocal<>();
/**
* 獲取threadLocal中設置的國家標志
* @return
*/
public static String getCountry() {
return countrySet.get();
}
/**
* 設置threadLocal中設置的國家
*/
public static void setCountry (String country) {
countrySet.set(country.toLowerCase());
}
/**
* 清除標志
*/
public static void clear () {
countrySet.remove();
}
}
/** 攔截器對cookie和url綜合判斷國家信息,放入到TransmittableThreadLocal中 **/
// 設置線程中的國家標志
String country = localeContext.getLocale().getCountry().toLowerCase();
ShopShardingHelperUtil.setCountry(country);
/** 自定義線程池,,用TtlExecutors包裝原有自定義線程池 **/
public static Executor getExecutor() {
if (executor == null) {
synchronized (TransmittableExecutor.class) {
if (executor == null) {
executor = TtlExecutors.getTtlExecutor(initExecutor());// 用TtlExecutors裝飾Executor,,結合TransmittableThreadLocal解決異步線程threadlocal傳遞問題
}
}
}
return executor;
}
/** 實際使用線程池的地方,直接調用執(zhí)行即可**/
TransmittableExecutor.getExecutor().execute(new BatchExeRunnable(param1,param2));
/** mybatis的Interceptor代碼, 使用TransmittableThreadLocal的國家信息,改造原有sql,,加上國家參數(shù),,在增刪改查sql中區(qū)分國家數(shù)據(jù) **/
public Object intercept(Invocation invocation) throws Throwable {
StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
BoundSql boundSql = statementHandler.getBoundSql();
String originalSql = boundSql.getSql();
Statement statement = (Statement) CCJSqlParserUtil.parse(originalSql);
String threadCountry = ShopShardingHelperUtil.getCountry();
// 線程中的國家不為空才進行處理
if (StringUtils.isNotBlank(threadCountry)) {
if (statement instanceof Select) {
Select selectStatement = (Select) statement;
VivoSelectVisitor vivoSelectVisitor = new VivoSelectVisitor(threadCountry);
vivoSelectVisitor.init(selectStatement);
} else if (statement instanceof Insert) {
Insert insertStatement = (Insert) statement;
VivoInsertVisitor vivoInsertVisitor = new VivoInsertVisitor(threadCountry);
vivoInsertVisitor.init(insertStatement);
} else if (statement instanceof Update) {
Update updateStatement = (Update) statement;
VivoUpdateVisitor vivoUpdateVisitor = new VivoUpdateVisitor(threadCountry);
vivoUpdateVisitor.init(updateStatement);
} else if (statement instanceof Delete) {
Delete deleteStatement = (Delete) statement;
VivoDeleteVisitor vivoDeleteVisitor = new VivoDeleteVisitor(threadCountry);
vivoDeleteVisitor.init(deleteStatement);
}
Field boundSqlField = BoundSql.class.getDeclaredField("sql");
boundSqlField.setAccessible(true);
boundSqlField.set(boundSql, statement.toString());
} else {
logger.error("----------- intercept not-add-country sql.... ---------" + statement.toString());
}
logger.info("----------- intercept query new sql.... ---------" + statement.toString());
// 調用方法,實際上就是攔截的方法
Object result = invocation.proceed();
return result;
}
對于 Dubbo 接口和無法判斷國家信息的 HTTP 接口,,在入?yún)⒉糠衷黾訃倚畔?shù),,通過攔截器或者手動set國家信息到TransmittableThreadLocal。
對于定時任務 job,,因為所有國家都需要執(zhí)行,,所以會把所有國家進行遍歷執(zhí)行,這也可以通過簡單的注解來解決,。
這個版本的改造,,點檢測試也基本通過了,自動化腳本驗證也是沒問題的,,不過因為業(yè)務發(fā)展問題最終沒上線,。
3.2 分庫 + SpringBoot
后續(xù)在建設新的國家商城的時候,分庫分表方案調整為每個國家獨立數(shù)據(jù)庫,,同時整體開發(fā)框架升級到SpringBoot,,我們把這套方案做了升級,總體思路是一樣的,,只是在實現(xiàn)細節(jié)上略有不同,。
SpringBoot 里面的異步一般通過@Async這個注解來實現(xiàn),,通過自定義線程池來包裝,使用時在 HTTP 請求判斷locale信息的寫入國家信息,,后續(xù)完成切DB的操作。
對于 Dubbo 接口和無法判斷國家信息的 HTTP 接口,,在入?yún)⒉糠衷黾訃倚畔?shù),,通過攔截器或者手動set國家信息到TransmittableThreadLocal。
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
return TtlThreadPoolExecutors.getAsyncExecutor();
}
public class TtlThreadPoolExecutors {
private static final String COMMON_BUSINESS = "COMMON_EXECUTOR";
public static final int QUEUE_CAPACITY = 20000;
public static ExecutorService getExecutorService() {
return TtlExecutorServiceMananger.getExecutorService(COMMON_BUSINESS);
}
public static ExecutorService getExecutorService(String threadGroupName) {
return TtlExecutorServiceMananger.getExecutorService(threadGroupName);
}
public static ThreadPoolTaskExecutor getAsyncExecutor() {
// 用TtlExecutors裝飾Executor,,結合TransmittableThreadLocal解決異步線程threadlocal傳遞問題
return getTtlThreadPoolTaskExecutor(initTaskExecutor());
}
private static ThreadPoolTaskExecutor initTaskExecutor () {
return initTaskExecutor(TtlThreadPoolFactory.DEFAULT_CORE_SIZE, TtlThreadPoolFactory.DEFAULT_POOL_SIZE, QUEUE_CAPACITY);
}
private static ThreadPoolTaskExecutor initTaskExecutor (int coreSize, int poolSize, int executorQueueCapacity) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(coreSize);
taskExecutor.setMaxPoolSize(poolSize);
taskExecutor.setQueueCapacity(executorQueueCapacity);
taskExecutor.setKeepAliveSeconds(120);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setThreadNamePrefix("TaskExecutor-ttl");
taskExecutor.initialize();
return taskExecutor;
}
private static ThreadPoolTaskExecutor getTtlThreadPoolTaskExecutor(ThreadPoolTaskExecutor executor) {
if (null == executor || executor instanceof ThreadPoolTaskExecutorWrapper) {
return executor;
}
return new ThreadPoolTaskExecutorWrapper(executor);
}
}
/**
* @ClassName : LocaleContextHolder
* @Description : 本地化信息上下文holder
*/
public class LocalizationContextHolder {
private static TransmittableThreadLocal<LocalizationContext> localizationContextHolder = new TransmittableThreadLocal<>();
private static LocalizationInfo defaultLocalizationInfo = new LocalizationInfo();
private LocalizationContextHolder(){}
public static LocalizationContext getLocalizationContext() {
return localizationContextHolder.get();
}
public static void resetLocalizationContext () {
localizationContextHolder.remove();
}
public static void setLocalizationContext (LocalizationContext localizationContext) {
if(localizationContext == null) {
resetLocalizationContext();
} else {
localizationContextHolder.set(localizationContext);
}
}
public static void setLocalizationInfo (LocalizationInfo localizationInfo) {
LocalizationContext localizationContext = getLocalizationContext();
String brand = (localizationContext instanceof BrandLocalizationContext ?
((BrandLocalizationContext) localizationContext).getBrand() : null);
if(StringUtils.isNotEmpty(brand)) {
localizationContext = new SimpleBrandLocalizationContext(localizationInfo, brand);
} else if(localizationInfo != null) {
localizationContext = new SimpleLocalizationContext(localizationInfo);
} else {
localizationContext = null;
}
setLocalizationContext(localizationContext);
}
public static void setDefaultLocalizationInfo(@Nullable LocalizationInfo localizationInfo) {
LocalizationContextHolder.defaultLocalizationInfo = localizationInfo;
}
public static LocalizationInfo getLocalizationInfo () {
LocalizationContext localizationContext = getLocalizationContext();
if(localizationContext != null) {
LocalizationInfo localizationInfo = localizationContext.getLocalizationInfo();
if(localizationInfo != null) {
return localizationInfo;
}
}
return defaultLocalizationInfo;
}
public static String getCountry(){
return getLocalizationInfo().getCountry();
}
public static String getTimezone(){
return getLocalizationInfo().getTimezone();
}
public static String getBrand(){
return getBrand(getLocalizationContext());
}
public static String getBrand(LocalizationContext localizationContext) {
if(localizationContext == null) {
return null;
}
if(localizationContext instanceof BrandLocalizationContext) {
return ((BrandLocalizationContext) localizationContext).getBrand();
}
throw new LocaleException("unsupported localizationContext type");
}
}
@Override
public LocaleContext resolveLocaleContext(final HttpServletRequest request) {
parseLocaleCookieIfNecessary(request);
LocaleContext localeContext = new TimeZoneAwareLocaleContext() {
@Override
public Locale getLocale() {
return (Locale) request.getAttribute(LOCALE_REQUEST_ATTRIBUTE_NAME);
}
@Override
public TimeZone getTimeZone() {
return (TimeZone) request.getAttribute(TIME_ZONE_REQUEST_ATTRIBUTE_NAME);
}
};
// 設置線程中的國家標志
setLocalizationInfo(request, localeContext.getLocale());
return localeContext;
}
private void setLocalizationInfo(HttpServletRequest request, Locale locale) {
String country = locale!=null?locale.getCountry():null;
String language = locale!=null?(locale.getLanguage() + "_" + locale.getVariant()):null;
LocaleRequestMessage localeRequestMessage = localeRequestParser.parse(request);
final String countryStr = country;
final String languageStr = language;
final String brandStr = localeRequestMessage.getBrand();
LocalizationContextHolder.setLocalizationContext(new BrandLocalizationContext() {
@Override
public String getBrand() {
return brandStr;
}
@Override
public LocalizationInfo getLocalizationInfo() {
return LocalizationInfoAssembler.assemble(countryStr, languageStr);
}
});
}
對于定時任務job,,因為所有國家都需要執(zhí)行,所以會把所有國家進行遍歷執(zhí)行,,這也可以通過簡單的注解和AOP來解決,。
四、總結
本文從業(yè)務拓展的角度闡述了在復雜業(yè)務場景下如何通過ThreadLocal,,過渡到InheritableThreadLocal,,再通過TransmittableThreadLocal解決實際業(yè)務問題。因為海外的業(yè)務在不斷的探索中前進,,技術也在不斷的探索中演進,,面對這種復雜多變的情況,我們的應對策略是先做國際化,,再做本地化,,more global才能more local,多國家的隔離只是國際化最基本的起點,,未來還有很多業(yè)務和技術等著我們?nèi)ヌ魬?zhàn),。