目标

  • 扩展ribbon完成灰度调用
  • 完成对zuul的支持
  • 完成服务间调用的支持
  • 实战,解决在开发环境,进行开发中的测试,DEBUG.在微服务的模式下,需要在开发者的机器启动大量服务, 启动大量的服务需要大量的内存和大量的时间,在我们时间的项目开发中,在16G的机器上甚至无法进行调 和测试相关工作。

思路

ZUUL调用服务

  • 利用eureka的mate-map机制,在服务启动时添加部分元数据信息
1
eureka.instance.metadata-map.developer=qingmu
  • 在访问zuul网关时携带参数developer=qingmu
  • 对zuul Filter 进行扩展,获取到参数中的developer,并设置到threadlocal中
  • 对ribbon进行扩展,重写ZoneAvoidanceRule.choose 方法,返回server之前获取到 ThreadLocal中预先设置的developer,并获取到allServers,遍历allServers,获取到server的的metadata-map 判断其中是否有developer=qingmu 如果有则命中,添加进一个新集合,遍历完成之后产生新的集合,使用的新的集合 完成server选择。如未能命中,则走已有的默认实现。如此便完成了ZUUL对server的灰度调用。

服务调用服务

  • 同样利用eureka的机制
  • 同理zuul会将developer进行传递
  • 当传递到服务时,服务自定义一个拦截器,将参数developer 取出,存入自己的ThreadLocal中,方便后续的feign使用
  • 当服务进行服务调用时
  • 首先我们对feign的拦截器进行扩展,将developer 参数继续传递下去,方便接下来的服务老铁继续使用
  • 其次走对ribbon扩展的相关逻辑。即完成了服务直接的灰度调用

注意

  • 由于使用了threadlocal 变量进行参数隐式传递,Hystrix的ThreadLocal隔离模式是无法使用了。
  • 由于jdk提供的线程池实现,无法进行跨线程池的threadlocal变量传递
  • 所以在进行灰度调用时,可使用信号量隔离模式
  • 设置strategy为SEMAPHORE
1
2

hystrix.command.default.execution.isolation.strategy: SEMAPHORE
  • 也可以使用自定义策略的当时进行threadlocal的传递

  • 继承HystrixConcurrencyStrategy策略类覆写wrapCallable方法即可

  • RibbonHystrixConcurrencyStrategy.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

@Slf4j
public class RibbonHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    private HystrixConcurrencyStrategy delegate;

    public RibbonHystrixConcurrencyStrategy() {
        try {
            this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (this.delegate instanceof RibbonHystrixConcurrencyStrategy) {
                // Welcome to singleton hell...
                return;
            }
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
                    .getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
                    .getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
                    .getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
                    .getPropertiesStrategy();
            this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
                    propertiesStrategy);
            HystrixPlugins.reset();
            HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
            HystrixPlugins.getInstance()
                    .registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
            HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
            HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        } catch (Exception e) {
            log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
        }
    }

    private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
                                                 HystrixMetricsPublisher metricsPublisher,
                                                 HystrixPropertiesStrategy propertiesStrategy) {
        if (log.isDebugEnabled()) {
            log.debug("Current Hystrix plugins configuration is ["
                    + "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
                    + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
                    + "propertiesStrategy [" + propertiesStrategy + "]," + "]");
            log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
        }
    }

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return new WrappedCallable<>(callable, RibbonFilterContextHolder.getCurrentContext());
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
                                            HystrixProperty<Integer> corePoolSize,
                                            HystrixProperty<Integer> maximumPoolSize,
                                            HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue) {
        return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
                keepAliveTime, unit, workQueue);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
                                            HystrixThreadPoolProperties threadPoolProperties) {
        return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
    }

    @Override
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        return this.delegate.getBlockingQueue(maxQueueSize);
    }

    @Override
    public <T> HystrixRequestVariable<T> getRequestVariable(
            HystrixRequestVariableLifecycle<T> rv) {
        return this.delegate.getRequestVariable(rv);
    }

    public static class WrappedCallable<T> implements Callable<T> {

        private final Callable<T> target;
        private final RibbonFilterContext ribbonFilterContext;

        public WrappedCallable(Callable<T> target, RibbonFilterContext ribbonFilterContext) {
            this.target = target;
            this.ribbonFilterContext = ribbonFilterContext;
        }

        @Override
        public T call() throws Exception {
            try {
                RibbonFilterContextHolder.setCurrentContext(ribbonFilterContext);
                return target.call();
            } finally {
                RibbonFilterContextHolder.clearCurrentContext();
            }
        }
    }
}

代码实现

zuul 拦截器扩展

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// RibbonFilter.java
@Component
public class RibbonFilter extends ZuulFilter {

    @Override
    public String filterType() {
        return PRE_TYPE;
    }

    @Override
    public int filterOrder() {
        return FORM_BODY_WRAPPER_FILTER_ORDER;
    }

    @Override
    public boolean shouldFilter() {
        return true;
    }

    @Override
    public Object run() {
        RibbonFilterContextHolder.clearCurrentContext();
        RequestContext ctx = RequestContext.getCurrentContext();
        final HttpServletRequest request = ctx.getRequest();
        final String requestURI = request.getRequestURI();
        if (request.getParameter("developer") != null) {
            // put the serviceId in `RequestContext`
            RibbonFilterContextHolder.getCurrentContext()
                    .add("developer", request.getParameter("developer"));
        } else if (request.getHeader("developer") != null) {
            RibbonFilterContextHolder.getCurrentContext()
                    .add("developer", request.getHeader("developer"));
        }
        return true;
    }

MetadataAwareRule ribbon 规则覆写

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class MetadataAwareRule extends ZoneAvoidanceRule {

    @Override
    public Server choose(Object key) {
        final RibbonFilterContext context = RibbonFilterContextHolder.getCurrentContext();
        ILoadBalancer lb = getLoadBalancer();
        final List<Server> allServers = lb.getAllServers();

        // 存放已打标签但不满足标签的server
        final List<Server> metaServers = new ArrayList<>();

        // 存放未标签的server
        final List<Server> noMetaServers = new ArrayList<>();

        // 匹配成功的server
        final List<Server> matchedMetaServers = new ArrayList<>();


        final Map<String, String> attributes = context.getAttributes();
        // 取得接口端传入的参数
        final String inputDeveloper = attributes.get("developer");

        for (Server server : allServers) {
            if (server instanceof DiscoveryEnabledServer) {
                final DiscoveryEnabledServer discoveryEnabledServer = (DiscoveryEnabledServer) server;
                final Map<String, String> metadata = discoveryEnabledServer.getInstanceInfo().getMetadata();
                final String developer = metadata.get("developer");
                // 如果没有meta数据 表示是测试服务上的地址
                if (developer == null || developer.equals("")) {
                    // 存放并没有打标签的server
                    noMetaServers.add(server);
                } else {
                    // 如果匹配成功开发者直接调用
                    if (inputDeveloper != null && (!"".equals(inputDeveloper)) && developer.equals(inputDeveloper)) {
                        matchedMetaServers.add(server);
                    } else {
                        // 存入server有标签但是不匹配的server
                        metaServers.add(server);
                    }
                }


            }
        }

        //优先走自定义路由。即满足灰度要求的server
        if (!matchedMetaServers.isEmpty()) {
            com.google.common.base.Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(matchedMetaServers, key);
            if (server.isPresent()) {
                return server.get();
            } else {
                return null;
            }
        }
        // 如果没有匹配成功的则走
        else {
            if (!noMetaServers.isEmpty()) {
                com.google.common.base.Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(noMetaServers, key);
                if (server.isPresent()) {
                    return server.get();
                } else {
                    return null;
                }
            } else {
                // 似情况打开
                return null;
//                com.google.common.base.Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(metaServers, key);
//                if (server.isPresent()) {
//                    return server.get();
//                } else {
//                    return null;
//                }
            }

        }

    }
}

ThreadLocal 变量封装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Setter
@Getter
public class RibbonFilterContext {

    private final Map<String, String> attributes = new HashMap<>();

    public RibbonFilterContext add(String key, String value) {
        attributes.put(key, value);
        return this;
    }

    public String get(String key) {
        return attributes.get(key);
    }

    public RibbonFilterContext remove(String key) {
        attributes.remove(key);
        return this;
    }

    public Map<String, String> getAttributes() {
        return Collections.unmodifiableMap(attributes);
    }
}

public class RibbonFilterContextHolder {

    private static final ThreadLocal<RibbonFilterContext> contextHolder = new InheritableThreadLocal<RibbonFilterContext>() {
        @Override
        protected RibbonFilterContext initialValue() {
            return new RibbonFilterContext();
        }
    };

    public static RibbonFilterContext getCurrentContext() {
        return contextHolder.get();
    }

    public static void setCurrentContext(RibbonFilterContext context) {
         contextHolder.set(context);
    }

    public static void clearCurrentContext() {
        contextHolder.remove();
    }
}

激活自定义Rule

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

@Configuration
@ConditionalOnClass(DiscoveryEnabledNIWSServerList.class)
@AutoConfigureBefore(RibbonClientConfiguration.class)
public class RibbonMetaFilterAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public MetadataAwareRule metadataAwareRule() {
        return new MetadataAwareRule();
    }

   /**
     * 根据自己的选择判断时候激活改策略
     * @return
     */
    @Bean
    public RibbonHystrixConcurrencyStrategy ribbonHystrixConcurrencyStrategy(){
        return new RibbonHystrixConcurrencyStrategy();
    }

}

对Feign进行扩展将developer进行传递

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

@Configuration
@EnableWebMvc
public class MyAutoConfigurationBefore extends WebMvcConfigurerAdapter {
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new HandlerInterceptor() {
            @Override
            public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
                 if (request.getParameter("developer") != null) {
                          RibbonFilterContextHolder.getCurrentContext()
                                  .add("developer", request.getParameter("developer"));
                      } else if (request.getHeader("developer") != null) {
                          RibbonFilterContextHolder.getCurrentContext()
                                  .add("developer", request.getHeader("developer"));
                      }
                return true;
            }

            @Override
            public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {

            }

            @Override
            public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
                RibbonFilterContextHolder.clearCurrentContext();
            }
        });
    }
}

  /**
     * @see feign.RequestInterceptor
     * @return
     */
    @Bean
    public RequestInterceptor headerInterceptor() {
        return requestTemplate -> {
            final String developer = RibbonFilterContextHolder.getCurrentContext().get("developer");
            if (StringUtils.isNotBlank(developer)) {
                requestTemplate.header("developer", developer);
            }
        };
    }
  • 如此便完成了灰度调用
  • 简单的说就是利用了threadlocal机制存储了从前端调用者传入的特殊参数
  • 在进行调用之前,拦截下负载均衡的choose方法,在调用之前对从注册中心获取到的所有server进行匹配
  • 成功则走匹配成功的server,匹配无一个成功的就走默认方法即可。