原创

解决hystrix隔离策略导致RequestContextHolder.getRequestAttributes()返回null的问题

什么是Hystrix隔离策略?

官方文档:https://github.com/Netflix/Hystrix/wiki/Configuration#executionisolationstrategy

执行隔离策略

此属性指示HystrixCommand.run()执行哪种隔离策略,是以下两种选择之一:

  • THREAD —它在单独的线程上执行,并发请求受线程池中线程数的限制
  • SEMAPHORE —它在调用线程上执行,并发请求受信号量限制

问题

当隔离策略为 THREAD 时,是没办法拿到 ThreadLocal 中的值的。

所以,当服务间进行feign远程调用的时候,就无法从请求头中获取request对象,进而拿到token了。

解决办法1 - 暴力修改Hystrix隔离策略

隔离策略设为SEMAPHORE

hystrix.command.default.execution.isolation.strategy: SEMAPHORE

但是Hystrix官方不推荐这种做法,并且强烈建议使用SEMAPHORE作为隔离策略。

Thread or Semaphore

The default, and the recommended setting, is to run HystrixCommands using thread isolation (THREAD) and HystrixObservableCommands using semaphore isolation (SEMAPHORE).

Commands executed in threads have an extra layer of protection against latencies beyond what network timeouts can offer.

Generally the only time you should use semaphore isolation for HystrixCommands is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.

解决办法2 - 自定义并发策略

自定义并发策略

编写一个类,让其继承HystrixConcurrencyStrategy ,并重写wrapCallable 方法即可。

CustomFeignHystrixConcurrencyStrategy:

package com.lzhpo.common.feign.strategy;

import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariable;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Custom isolation strategy to solve the problem that {@code RequestContextHolder.getRequestAttributes()} is empty
 *
 * @author Zhaopo Liu
 */
@Slf4j
@Primary
@Component
public class CustomFeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

    private HystrixConcurrencyStrategy hystrixConcurrencyStrategy;

    public CustomFeignHystrixConcurrencyStrategy() {
        try {
            this.hystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (this.hystrixConcurrencyStrategy instanceof CustomFeignHystrixConcurrencyStrategy) {
                // Welcome to singleton hell...
                return;
            }
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
            this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, propertiesStrategy);
            HystrixPlugins.reset();
            HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
            HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
            HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
            HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        } catch (Exception e) {
            log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
        }
    }

    private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
        HystrixMetricsPublisher metricsPublisher, HystrixPropertiesStrategy propertiesStrategy) {
        if (log.isDebugEnabled()) {
            log.info(
                "Current Hystrix plugins configuration is [" + "concurrencyStrategy [" + this.hystrixConcurrencyStrategy
                    + "]," + "eventNotifier [" + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
                    + "propertiesStrategy [" + propertiesStrategy + "]," + "]");
            log.info("Registering Sleuth Hystrix Concurrency Strategy.");
        }
    }

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        return new WrappedCallable<>(callable, requestAttributes);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,
        HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
        return this.hystrixConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
            keepAliveTime, unit, workQueue);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
        HystrixThreadPoolProperties threadPoolProperties) {
        return this.hystrixConcurrencyStrategy.getThreadPool(threadPoolKey, threadPoolProperties);
    }

    @Override
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        return this.hystrixConcurrencyStrategy.getBlockingQueue(maxQueueSize);
    }

    @Override
    public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
        return this.hystrixConcurrencyStrategy.getRequestVariable(rv);
    }

    static class WrappedCallable<T> implements Callable<T> {
        private final Callable<T> target;
        private final RequestAttributes requestAttributes;

        public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
            this.target = target;
            this.requestAttributes = requestAttributes;
        }

        /**
         * feign opens the fuse (hystrix): feign.hystrix.enabled=ture, and uses the default signal isolation level,
         * The HttpServletRequest object is independent of each other in the parent thread
         * and the child thread and is not shared.
         * So the HttpServletRequest data of the parent thread used in the child thread is null,
         * naturally it is impossible to obtain the token information of the request header
         * In a multithreaded environment, call before the request, set the context before the call
         *
         * @return T
         * @throws Exception Exception
         */
        @Override
        public T call() throws Exception {
            try {
                // Set true to share the parent thread's HttpServletRequest object setting
                RequestContextHolder.setRequestAttributes(requestAttributes, true);
                return target.call();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}

SpringCloud集成Feign

配置Feign请求拦截器(请求头携带token)

package com.lzhpo.common.feign.config;

import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * Inter-service calls carry Authorization request header
 *
 * @author Zhaopo Liu
 */
@Slf4j
@Configuration
public class CustomFeignConfig implements RequestInterceptor {

    private static final String TOKEN_HEADER = "authorization";

    @Override
    public void apply(RequestTemplate requestTemplate) {
        HttpServletRequest request = getHttpServletRequest();
        if (request != null) {
            String token = getHeaders(request).get(TOKEN_HEADER);
            log.info("Get token: {}", token);
            requestTemplate.header(TOKEN_HEADER, token);
        }
    }

    private HttpServletRequest getHttpServletRequest() {
        try {
            ServletRequestAttributes attributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes();
            if (attributes != null) {
                return attributes.getRequest();
            }
            log.error("ServletRequestAttributes is empty, unable to obtain ServletRequestAttributes.");
            return null;
        } catch (Exception e) {
            log.error("Failed to get ServletRequestAttributes: {}", e.getMessage());
            return null;
        }
    }

    private Map<String, String> getHeaders(HttpServletRequest request) {
        Map<String, String> map = new LinkedHashMap<>();
        Enumeration<String> enumeration = request.getHeaderNames();
        while (enumeration.hasMoreElements()) {
            String key = enumeration.nextElement();
            String value = request.getHeader(key);
            map.put(key, value);
        }
        return map;
    }
}

配置OkHttp

package com.lzhpo.common.feign.config;

import com.lzhpo.common.feign.interceptor.LogInterceptor;
import feign.Feign;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.openfeign.FeignAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

/**
 * OkHttp config
 *
 * @author Zhaopo Liu
 */
@Configuration
@ConditionalOnClass(Feign.class)
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class FeignOkHttpConfig {

    @Bean
    public OkHttpClient okHttpClient() {
        return new OkHttpClient.Builder()
                // Connect timeout
                .connectTimeout(60, TimeUnit.SECONDS)
                // Read timeout
                .readTimeout(60, TimeUnit.SECONDS)
                // Write timeout
                .writeTimeout(60, TimeUnit.SECONDS)
                // Whether to reconnect automatically
                .retryOnConnectionFailure(true).connectionPool(new ConnectionPool())
                // Log interceptor
                .addInterceptor(new LogInterceptor())
                .build();
    }
}

配置RestTemplate

package com.lzhpo.common.feign.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

/**
 * RestTemplateConfig
 *
 * @author Zhaopo Liu
 */
@Configuration
public class RestTemplateConfig {

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

日志拦截器

package com.lzhpo.common.feign.interceptor;

import lombok.extern.slf4j.Slf4j;
import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;

import java.io.IOException;

/**
 * Log interceptor
 *
 * @author Zhaopo Liu
 */
@Slf4j
public class LogInterceptor implements Interceptor {

    @Override
    public Response intercept(Chain chain) throws IOException {
        long t1 = System.nanoTime();
        Request request = chain.request();
        log.info(String.format("sending %s request %s%n%s", request.method(), request.url(), request.headers()));
        Response response = chain.proceed(request);
        long t2 = System.nanoTime();
        log.info(String.format("received response for %s in %.1fms%n%s", response.request().url(), (t2 - t1) / 1e6d,
            response.headers()));
        return response;
    }
}

自动注入配置

我是common-feign依赖,需要配置spring.factories自动注入。

所以,配置:/resources/META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.lzhpo.common.feign.config.CustomFeignConfig,\
  com.lzhpo.common.feign.config.FeignOkHttpConfig,\
  com.lzhpo.common.feign.config.RestTemplateConfig,\
  com.lzhpo.common.feign.strategy.CustomFeignHystrixConcurrencyStrategy

------集成用户服务demo------

配置文件

application.yml

# feign config
feign:
  httpclient:
    enabled: false
  okhttp:
    enabled: true
  hystrix:
    enabled: true

# hystrix config
hystrix:
  shareSecurityContext: true
  command:
    default:
      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 60000

management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint:
    health:
      show-details: ALWAYS

用户服务Client

package com.lzhpo.systemservice.api.feign;

import com.lzhpo.common.constant.SysConst;
import com.lzhpo.common.feign.config.CustomFeignConfig;
import com.lzhpo.common.response.ResultVo;
import com.lzhpo.common.entity.SecurityUser;
import com.lzhpo.systemservice.api.entity.SysRole;
import com.lzhpo.systemservice.api.feign.factory.UserServiceClientFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;

import java.util.List;

/** @author Zhaopo Liu */
@FeignClient(value = SysConst.SYSTEM_SERVICE, configuration = CustomFeignConfig.class,
        fallbackFactory = UserServiceClientFallbackFactory.class)
public interface UserServiceClient {

    @GetMapping("/api/user/v1/user/findUserInfoByUsername")
    ResultVo<SecurityUser> findUserInfoByUsername(@RequestParam("username") String username);
}

日志断路器,Feign回退方法

package com.lzhpo.systemservice.api.feign.fallback;

import com.lzhpo.common.response.ResultVo;
import com.lzhpo.common.entity.SecurityUser;
import com.lzhpo.systemservice.api.entity.SysRole;
import com.lzhpo.systemservice.api.feign.UserServiceClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * Log circuit breaker implementation, fallback method called by feign
 *
 * @author Zhaopo Liu
 */
@Slf4j
@Component
public class UserServiceClientFallbackImpl implements UserServiceClient {

    private Throwable throwable;

    public Throwable getThrowable() {
        return throwable;
    }

    public void setThrowable(Throwable throwable) {
        this.throwable = throwable;
    }

    @Override
    public ResultVo<SecurityUser> findUserInfoByUsername(String username) {
        log.error("Query user information based on user name [{}] failed! Exception information: {}", username, throwable.getMessage());
        return null;
    }
}

用户断路器工厂

package com.lzhpo.systemservice.api.feign.factory;

import com.lzhpo.systemservice.api.feign.UserServiceClient;
import com.lzhpo.systemservice.api.feign.fallback.UserServiceClientFallbackImpl;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * User circuit breaker factory
 *
 * @author Zhaopo Liu
 */
@Component
@Slf4j
public class UserServiceClientFallbackFactory implements FallbackFactory<UserServiceClient> {

    @Override
    public UserServiceClient create(Throwable throwable) {
        UserServiceClientFallbackImpl userServiceClientFallback = new UserServiceClientFallbackImpl();
        userServiceClientFallback.setThrowable(throwable);
        log.error("The user circuit breaker detects the feign abnormality, the reason for the abnormality: {}", throwable.getMessage());
        return userServiceClientFallback;
    }
}
正文到此结束
本文目录