博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊HystrixCommandExecutionHook
阅读量:7092 次
发布时间:2019-06-28

本文共 12571 字,大约阅读时间需要 41 分钟。

本文主要研究一下HystrixCommandExecutionHook

HystrixCommandExecutionHook

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/executionhook/HystrixCommandExecutionHook.java

/** * Abstract ExecutionHook with invocations at different lifecycle points of {@link HystrixCommand} * and {@link HystrixObservableCommand} execution with default no-op implementations. * 

* See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: https://github.com/Netflix/Hystrix/wiki/Plugins. *

* Note on thread-safety and performance *

* A single implementation of this class will be used globally so methods on this class will be invoked concurrently from multiple threads so all functionality must be thread-safe. *

* Methods are also invoked synchronously and will add to execution time of the commands so all behavior should be fast. If anything time-consuming is to be done it should be spawned asynchronously * onto separate worker threads. * * @since 1.2 * */public abstract class HystrixCommandExecutionHook { /** * Invoked before {@link HystrixInvokable} begins executing. * * @param commandInstance The executing HystrixInvokable instance. * * @since 1.2 */ public

void onStart(HystrixInvokable
commandInstance) { //do nothing by default } /** * Invoked when {@link HystrixInvokable} emits a value. * * @param commandInstance The executing HystrixInvokable instance. * @param value value emitted * * @since 1.4 */ public
T onEmit(HystrixInvokable
commandInstance, T value) { return value; //by default, just pass through } /** * Invoked when {@link HystrixInvokable} fails with an Exception. * * @param commandInstance The executing HystrixInvokable instance. * @param failureType {@link FailureType} enum representing which type of error * @param e exception object * * @since 1.2 */ public
Exception onError(HystrixInvokable
commandInstance, FailureType failureType, Exception e) { return e; //by default, just pass through } /** * Invoked when {@link HystrixInvokable} finishes a successful execution. * * @param commandInstance The executing HystrixInvokable instance. * * @since 1.4 */ public
void onSuccess(HystrixInvokable
commandInstance) { //do nothing by default } /** * Invoked at start of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}. * * @param commandInstance The executing HystrixCommand instance. * * @since 1.2 */ public
void onThreadStart(HystrixInvokable
commandInstance) { //do nothing by default } /** * Invoked at completion of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}. * This will get invoked whenever the Hystrix thread is done executing, regardless of whether the thread finished * naturally, or was unsubscribed externally * * @param commandInstance The executing HystrixCommand instance. * * @since 1.2 */ public
void onThreadComplete(HystrixInvokable
commandInstance) { // do nothing by default } /** * Invoked when the user-defined execution method in {@link HystrixInvokable} starts. * * @param commandInstance The executing HystrixInvokable instance. * * @since 1.4 */ public
void onExecutionStart(HystrixInvokable
commandInstance) { //do nothing by default } /** * Invoked when the user-defined execution method in {@link HystrixInvokable} emits a value. * * @param commandInstance The executing HystrixInvokable instance. * @param value value emitted * * @since 1.4 */ public
T onExecutionEmit(HystrixInvokable
commandInstance, T value) { return value; //by default, just pass through } /** * Invoked when the user-defined execution method in {@link HystrixInvokable} fails with an Exception. * * @param commandInstance The executing HystrixInvokable instance. * @param e exception object * * @since 1.4 */ public
Exception onExecutionError(HystrixInvokable
commandInstance, Exception e) { return e; //by default, just pass through } /** * Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully. * * @param commandInstance The executing HystrixInvokable instance. * * @since 1.4 */ public
void onExecutionSuccess(HystrixInvokable
commandInstance) { //do nothing by default } /** * Invoked when the fallback method in {@link HystrixInvokable} starts. * * @param commandInstance The executing HystrixInvokable instance. * * @since 1.2 */ public
void onFallbackStart(HystrixInvokable
commandInstance) { //do nothing by default } /** * Invoked when the fallback method in {@link HystrixInvokable} emits a value. * * @param commandInstance The executing HystrixInvokable instance. * @param value value emitted * * @since 1.4 */ public
T onFallbackEmit(HystrixInvokable
commandInstance, T value) { return value; //by default, just pass through } /** * Invoked when the fallback method in {@link HystrixInvokable} fails with an Exception. * * @param commandInstance The executing HystrixInvokable instance. * @param e exception object * * @since 1.2 */ public
Exception onFallbackError(HystrixInvokable
commandInstance, Exception e) { //by default, just pass through return e; } /** * Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully. * * @param commandInstance The executing HystrixInvokable instance. * * @since 1.4 */ public
void onFallbackSuccess(HystrixInvokable
commandInstance) { //do nothing by default } /** * Invoked when the command response is found in the {@link com.netflix.hystrix.HystrixRequestCache}. * * @param commandInstance The executing HystrixCommand * * @since 1.4 */ public
void onCacheHit(HystrixInvokable
commandInstance) { //do nothing by default } /** * Invoked with the command is unsubscribed before a terminal state * * @param commandInstance The executing HystrixInvokable instance. * * @since 1.5.9 */ public
void onUnsubscribe(HystrixInvokable
commandInstance) { //do nothing by default }}复制代码

主要规范了如下钩子方法:

  • onStart
  • onEmit
  • onError
  • onSuccess
  • onThreadStart
  • onThreadComplete
  • onExecutionStart
  • onExecutionEmit
  • onExecutionError
  • onExecutionSuccess
  • onFallbackStart
  • onFallbackEmit
  • onFallbackError
  • onFallbackSuccess
  • onCacheHit
  • onUnsubscribe

HystrixCommandExecutionHookDefault

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/executionhook/HystrixCommandExecutionHookDefault.java

/** * Default implementations of {@link HystrixCommandExecutionHook} that does nothing. *  * @ExcludeFromJavadoc */public class HystrixCommandExecutionHookDefault extends HystrixCommandExecutionHook {    private static HystrixCommandExecutionHookDefault INSTANCE = new HystrixCommandExecutionHookDefault();    private HystrixCommandExecutionHookDefault() {    }    public static HystrixCommandExecutionHook getInstance() {        return INSTANCE;    }}复制代码

默认的实现并没有重新钩子方法

回调

在hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java这个类中,在相应的方法里头对executionHook进行了相应的回调,比如:

private Observable
executeCommandWithSpecifiedIsolation(final AbstractCommand
_cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0
>() { @Override public Observable
call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //we have not been unsubscribed, so should proceed HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); /** * If any of these hooks throw an exception, then it appears as if the actual execution threw an error */ try { executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it } }).doOnUnsubscribe(new Action0() { @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it } }).subscribeOn(threadPool.getScheduler(new Func0
() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { return Observable.defer(new Func0
>() { @Override public Observable
call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // semaphore isolated // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } } }); } }复制代码

这个方法回调了executionHook的onThreadStart、onRunStart、onExecutionStart方法。

小结

HystrixCommandExecutionHook提供了对HystrixCommand及HystrixObservableCommand生命周期的钩子方法,开发者可以自定义实现,做一些额外的处理,比如日志打印、覆盖response、更改线程状态等等。

doc

转载地址:http://giiql.baihongyu.com/

你可能感兴趣的文章
WCF 基础简介
查看>>
用Soap消息调用Web Services(续)
查看>>
php数据库操作封装类
查看>>
atitit.导出excel的设计----查询结果 导出为excel的实现java .net php 总结
查看>>
[LeetCode] Partition List 划分链表
查看>>
以Ajax方式显示Lotus Notes视图的javasript类库----NotesView2
查看>>
ylbtech-memorandum(备忘录)-数据库设计
查看>>
spm中头动绘图的理解,自带数据集
查看>>
PostgreSQL的 initdb 源代码分析之二十五
查看>>
I.MX6 su.c 测试
查看>>
Restful风格API接口开发springMVC篇
查看>>
车辆管理系统之继续自己的任务(五)
查看>>
谁该赋予一款产品灵魂?
查看>>
自我总结(八)- 新学期
查看>>
I.MX6 wm8962 0-001a: DC servo timed out
查看>>
ACM进阶计划
查看>>
Spring3 表达式语言(SpEL)介绍
查看>>
【Java学习笔记之七】java函数的语法规则总结
查看>>
5.23. msgpack
查看>>
【Java学习笔记之三十三】详解Java中try,catch,finally的用法及分析
查看>>