呼吁外部服务可能出于多种原因失败,有时是由于网络故障造成的,有时是因为外部服务太忙了,甚至是我们的业务逻辑中的一些问题以及许多其他可能的情况。
当检测到故障时,有多种策略可以处理它,例如取消请求,或者在某些情况下,我们不想取消,但是,我们想再次重试,直到获得所需的结果或直到我们负担不起发送更多请求为止。在发送下一个请求之前重试我们必须等待多少次,取决于我们的业务需求。
处理这种情况有一种常见的重试策略是通过指数向后重试,当请求失败时,我们不会立即发送下一个请求,但是或者,等待一段时间在每次尝试后,在达到一定的延迟时间后呈指数增加,我们会避免增加更多时间,从而增加“回报”一词。
每次重试的等待时间仅由指数函数表示:
(t := :b^c )
其中(t )是延迟时间,(b )是基本或乘法因子,而每次发生故障发生时,(c )值都会增加。例如,让每个请求的时间延迟以秒为单位,我们使用base 2 for (b ),在尝试3次之后,我们必须等待(2^3 )秒,然后再发送第四请求。
你什么时候应该重试?
以下是一些实施重试策略的原因(并非详尽):
当完全理解失败的上下文时。
重复以前失败的请求可以在随后的尝试中成功。
请求失败的成本大于重试费用。
这些断层预计将是短暂的。
ETC…
执行
我们将在Java中创建一个简单的重试。 HTTP客户端将向HTTP服务器发送请求,请求可以成功或失败。当请求失败时,我们会重新发送请求,直到成功消息到达或无法负担任何其他尝试。
首先,我们定义了BackOffStrategy界面,该界面完全具有一个方法get() :
public interface BackoffStrategy { /** * Tests the supplier value with the provided predicate, if the predicate * evaluates to false, then the supplier value is repeatedly provided to the predicate * till it's evaluated to true or maximum number of attempts are reached, then * return the supplier value wrapped in an Optional, which is potentially an empty optional. * * @param supplier the action to be performed that potentially can fail * @param predicate the predicate to test the supplier value * @param numAttempts maximum number of attempts to perform the supplier action * * @return the supplier value wrapped in an optional, possibly empty. * @param < T > return type of the supplier value */ < T > Optional < T > get ( Supplier < T > supplier , Predicate < T > predicate , int numAttempts ) ;}
第一个参数将保留我们要获得的结果,在这种情况下,我们希望将某些请求发送到某个端点,并接收“成功”或“失败”消息,第二个参数测试了第一个参数提供的值,第三个参数表示我们想要提出的最大请求数。现在,我们拥有ExponentialBackOffStrategy构成:
public class ExponentialBackoffStrategy implements BackoffStrategy { private static final Logger log = LoggerFactory . getLogger ( ExponentialBackoffStrategy . class ) ; private final TimeDelayProvider timeDelayProvider ; public ExponentialBackoffStrategy ( TimeDelayProvider timeDelayProvider ) { this . timeDelayProvider = timeDelayProvider ; } @ Override public < T > Optional < T > get ( Supplier < T > supplier , Predicate < T > predicate , int maxAttempts ) { T t = supplier . get () ; int attempts = 1 ; while ( ! predicate . test ( t ) && attempts < maxAttempts ) { try { long time = timeDelayProvider . getDelay ( attempts ) ; log . info ( " Predicate tested fail!! Retry in: {}ms " , time ) ; Thread . sleep ( time ) ; t = supplier . get () ; } catch ( Exception e ) { log . error ( " Fail to get the result: {} " , e . getMessage (), e ) ; Thread . currentThread (). interrupt () ; return Optional . empty () ; } finally { ++ attempts ; } } log . info ( " Total requests have tried: {} " , attempts ) ; return Optional . of ( t ) ; }}
在这里,我们对此类介绍了一个依赖性,即TimDelayProvider ,它将根据我们一开始提到的指数函数为每个后续请求提供延迟时间:
public class ExponentialTimeDelayProvider implements TimeDelayProvider { private final ThreadLocalRandom rand = ThreadLocalRandom . current () ; private final int base ; private final long maxBackOffTime ; public ExponentialTimeDelayProvider ( int base , long maxBackOffTime ) { this . base = base ; this . maxBackOffTime = maxBackOffTime ; } @ Override public long getDelay ( int noAttempts ) { double pow = Math . pow ( base , noAttempts ) ; int extraDelay = rand . nextInt ( 1000 ) ; return ( long ) Math . min ( pow * 1000 + extraDelay , maxBackOffTime ) ; } @ Override public long maxBackoff () { return this . maxBackOffTime ; }}
getDelay函数还将通过几秒钟的给定尝试来计算延迟时间,它增加了一些随机的额外延迟,以确保并非所有请求都会同时发送。 maxBackOff为我们提供了请求必须等待的最大延迟时间。
接下来,我们创建一个将向HTTP服务器发送请求的发件人(我们将很快创建),响应将是“成功”或“失败”的字符串:
public class RetrySender { private static final Logger log = LoggerFactory . getLogger ( RetrySender . class ) ; private static final String API_ENDPOINT = " https://127.0.***0.1:8080/retry " ; private static final CloseableHttpClient client = HttpClientBuilder . create (). build () ; private final BackoffStrategy backOffStrategy ; private final int maxAttempts ; public RetrySender ( BackoffStrategy backOffStrategy , int maxAttempts ) { this . backOffStrategy = backOffStrategy ; this . maxAttempts = maxAttempts ; } public String getStatus ( String uuid , int succeedOn ) { String endpoint = String . format ( " %s?uuid=%s&succeedWhen=%d " , API_ENDPOINT , uuid , succeedOn ) ; Optional < String > maybeResponse = backOffStrategy . get (() - > sendRequest ( endpoint ), " SUCCESS " :: equals , maxAttempts ) ; return maybeResponse . orElse ( " FAILURE " ) ; } private String sendRequest ( String endpoint ) { HttpGet httpGet = new HttpGet ( endpoint ) ; try ( CloseableHttpResponse execute = client . execute ( httpGet )) { InputStream is = execute . getEntity (). getContent () ; return new String ( is . readAllBytes ()) ; } catch ( Exception e ) { log . error ( " Fail to execute request to: {} " , endpoint , e ) ; return " FAILURE " ; } }}
请注意,它采用了我们之前创建的BackoffStrategy , getStatus方法采用2个参数,第一个是确定将发送到服务器的请求,而第二个sucessWhen是尝试的总数,例如,如果我将3个转移到成功,那么如果我将3个放在那里,那么成功的消息将在3次尝试后返回。
在该方法内部,我们使用BackOffStrategy ,谓语SUCCESS::equals评估为true。 sendRequest方法只需将请求发送到我们的HTTP服务器并获得一些字符串响应。
接下来,我们创建我们的HTTP服务器,在其中使用com.sun.net.httpserver软件包的现有HttpServer类:
public class SimpleHttpServer { private final SimpleHttpHandler handler ; public SimpleHttpServer ( SimpleHttpHandler handler ) { this . handler = handler ; } public void run () throws IOException { HttpServer server = HttpServer . create ( new InetSocketAddress ( " 127.0.0.1 " , 8080 ), 0 ) ; server . createContext ( " /retry " , handler ) ; server . start () ; }}
为了处理RetrySender客户端的请求,接口RequestHandler有2种方法:
public interface RequestHandler { String handle ( RetryRequest request ) ; int getTries ( String uuid ) ;}
handle方法将根据提供的RetryRequest返回“成功”或“失败”,而getTries将为我们提供给定uuid的请求数量:
public class RetryRequestHandler implements RequestHandler { private final Map < String , Integer > idToRequests = new ConcurrentHashMap < >() ; @ Override public String handle ( RetryRequest request ) { String uuid = request . uuid () ; idToRequests . merge ( uuid , 1 , Integer :: sum ) ; int numRequests = idToRequests . get ( uuid ) ; if ( numRequests >= request . successWhen () && request . successWhen () != - 1 ) { return " SUCCESS " ; } else { return " FAILURE " ; } } @ Override public int getTries ( String uuid ) { if ( this . idToRequests . get ( uuid ) == null ) return 0 ; return this . idToRequests . get ( uuid ) ; }}
对于handle功能,每次到达新请求时,我们都会检查其UUID是否已经存在,并相应地更新地图。为了刺激请求未能测试我们的重试逻辑,当请求总数少于succeededOn阈值时(已被解释)或successWhen时,我们会发送“失败”消息,否则返回“成功”。
最后,我们准备编写一些测试来测试我们的代码,首先,我们为代码定义了一些常数和必要的依赖关系:
class ExponentialBackoffStrategyTest { static int MAX_ATTEMPTS = 5 ; static TimeDelayProvider timeDelayProvider = new ExponentialTimeDelayProvider ( 2 , 10_000 ) ; static BackoffStrategy exponentialBackoffStrategy = new ExponentialBackoffStrategy ( timeDelayProvider ) ; static RetrySender retrySender = new RetrySender ( exponentialBackoffStrategy , MAX_ATTEMPTS ) ; static RequestHandler requestHandler = new RetryRequestHandler () ; static SimpleHttpHandler simpleHttpHandler = new SimpleHttpHandler ( requestHandler ) ; static SimpleHttpServer simpleHttpServer = new SimpleHttpServer ( simpleHttpHandler ) ; static RetryRequest succeededFirstTry = new RetryRequest ( UUID . randomUUID (). toString (), 1 ) ; static RetryRequest succeededMoreTries = new RetryRequest ( UUID . randomUUID (). toString (), 3 ) ; static RetryRequest failOnAllTries = new RetryRequest ( UUID . randomUUID (). toString (), - 1 ) ; static String SUCCESS = " SUCCESS " ; static String FAILURE = " FAILURE " ; @ BeforeAll static void init () throws IOException { simpleHttpServer . run () ; }}
我们创建了第一次尝试检查请求的第一个方法:第一次尝试:
@ Testvoid testSuccessfulFirstTry () { String status = retrySender . getStatus ( succeededFirstTry . uuid (), succeededFirstTry . successWhen ()) ; assertEquals ( 1 , requestHandler . getTries ( succeededFirstTry . uuid ())) ; assertEquals ( SUCCESS , status ) ;}
该方法运行并通过;然后,我们提供了一些日志,例如:
Total requests have tried : 1
接下来,我们创建一个只有在3次尝试之后才能成功的请求。对于每个故障,延迟时间将成倍增加,直到达到上限为止,在我们的情况下,我们将其设置了10秒:
@ Testvoid testSuccessMoreTries () { String status = retrySender . getStatus ( succeededMoreTries . uuid (), succeededMoreTries . successWhen ()) ; assertEquals ( 3 , requestHandler . getTries ( succeededMoreTries . uuid ())) ; assertEquals ( SUCCESS , status ) ;}
测试通过了,我们得到了一些日志:
Predicate tested fail !! Retry in : 2161msPredicate tested fail !! Retry in : 4785msTotal requests have tried : 3
最后,我们测试了请求永远不会成功的情况,将反复发送相同的请求(max_attempts),然后返回:
@ Testvoid testAllFails () { String status = retrySender . getStatus ( failOnAllTries . uuid (), failOnAllTries . successWhen ()) ; assertEquals ( MAX_ATTEMPTS , requestHandler . getTries ( failOnAllTries . uuid ())) ; assertEquals ( FAILURE , status ) ;}
每个后续请求的延迟似乎每次都翻了一番,这再次是我们上一次测试方法的日志:
Predicate tested fail !! Retry in : 2987msPredicate tested fail !! Retry in : 4422msPredicate tested fail !! Retry in : 8447msPredicate tested fail !! Retry in : 10000msTotal requests have tried : 5
这里提供了代码示例。