使用Apache'HttpClient'在PUT操作期间更快地检测到中断连接

我正在使用Apache HttpClient 4与REST API进行通信,而且大部分时间我都在执行冗长的PUT操作。 由于这些可能发生在一个不稳定的互联网连接上,我需要检测连接是否中断,并可能需要重试(带有恢复请求)。

为了在真实世界中尝试我的例程,我开始了一个PUT操作,然后翻转了我的笔记本电脑的Wi-Fi开关,导致所有数据stream的即时完全中断。 然而,在最终抛出一个SocketException之前,需要花费大概5分钟左右的时间。

我怎样才能加快处理? 我想设置一个30秒左右的超时时间。

更新:

为了澄清,我的要求是一个PUT操作。 所以很长一段时间(可能是几个小时),唯一的操作是write()操作,没有读操作。 read()操作有一个超时设置 ,但是我找不到一个写操作。

我正在使用我自己的实体实现,因此我直接写入一个OutputStream,这将几乎立即阻止一旦Internet连接中断。 如果OutputStreams有一个超时参数,所以我可以写out.write(nextChunk, 30000); 我自己可以发现这样的问题。 其实我试过了:

 public class TimeoutHttpEntity extends HttpEntityWrapper { public TimeoutHttpEntity(HttpEntity wrappedEntity) { super(wrappedEntity); } @Override public void writeTo(OutputStream outstream) throws IOException { try(TimeoutOutputStreamWrapper wrapper = new TimeoutOutputStreamWrapper(outstream, 30000)) { super.writeTo(wrapper); } } } public class TimeoutOutputStreamWrapper extends OutputStream { private final OutputStream delegate; private final long timeout; private final ExecutorService executorService = Executors.newSingleThreadExecutor(); public TimeoutOutputStreamWrapper(OutputStream delegate, long timeout) { this.delegate = delegate; this.timeout = timeout; } @Override public void write(int b) throws IOException { executeWithTimeout(() -> { delegate.write(b); return null; }); } @Override public void write(byte[] b) throws IOException { executeWithTimeout(() -> { delegate.write(b); return null; }); } @Override public void write(byte[] b, int off, int len) throws IOException { executeWithTimeout(() -> { delegate.write(b, off, len); return null; }); } @Override public void close() throws IOException { try { executeWithTimeout(() -> { delegate.close(); return null; }); } finally { executorService.shutdown(); } } private void executeWithTimeout(final Callable<?> task) throws IOException { try { executorService.submit(task).get(timeout, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { throw new IOException(e); } catch (ExecutionException e) { final Throwable cause = e.getCause(); if (cause instanceof IOException) { throw (IOException)cause; } throw new Error(cause); } catch (InterruptedException e) { throw new Error(e); } } } public class TimeoutOutputStreamWrapperTest { private static final byte[] DEMO_ARRAY = new byte[]{1,2,3}; private TimeoutOutputStreamWrapper streamWrapper; private OutputStream delegateOutput; public void setUp(long timeout) { delegateOutput = mock(OutputStream.class); streamWrapper = new TimeoutOutputStreamWrapper(delegateOutput, timeout); } @AfterMethod public void teardown() throws Exception { streamWrapper.close(); } @Test public void write_writesByte() throws Exception { // Setup setUp(Long.MAX_VALUE); // Execution streamWrapper.write(DEMO_ARRAY); // Evaluation verify(delegateOutput).write(DEMO_ARRAY); } @Test(expectedExceptions = DemoIOException.class) public void write_passesThruException() throws Exception { // Setup setUp(Long.MAX_VALUE); doThrow(DemoIOException.class).when(delegateOutput).write(DEMO_ARRAY); // Execution streamWrapper.write(DEMO_ARRAY); // Evaluation performed by expected exception } @Test(expectedExceptions = IOException.class) public void write_throwsIOException_onTimeout() throws Exception { // Setup final CountDownLatch executionDone = new CountDownLatch(1); setUp(100); doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { executionDone.await(); return null; } }).when(delegateOutput).write(DEMO_ARRAY); // Execution try { streamWrapper.write(DEMO_ARRAY); } finally { executionDone.countDown(); } // Evaluation performed by expected exception } public static class DemoIOException extends IOException { } } 

这有些复杂,但是在我的unit testing中效果很好。 而且它在现实生活中也是如此,除了HttpRequestExecutor捕获127行中的exception并尝试closures连接。 但是,当试图closures连接时,它首先尝试刷新连接再次阻塞。

我可能能够深入了解HttpClient,并弄清楚如何防止这种刷新操作,但这已经不是一个非常漂亮的解决scheme,而且会变得更糟。

更新

看起来这不能在Java级别上完成。 我可以在另一个级别上做吗? (我正在使用Linux)。

Java阻塞I / O不支持写入操作的套接字超时。 你完全由OS / JRE的摆布来解锁被写操作阻塞的线程。 而且,这个行为往往是OS / JRE特有的。

这可能是考虑使用基于非阻塞I / O(NIO)(如Apache HttpAsyncClient )的HTTP客户端的合理情况。

您可以使用RequestConfig配置套接字超时:

 RequestConfig myRequestConfig = RequestConfig.custom() .setSocketTimeout(5000) // 5 seconds .build(); 

当你打电话时,只需分配你的新配置。 例如,

 HttpPut httpPut = new HttpPut("..."); httpPut.setConfig(requestConfig); ... HttpClientContext context = HttpClientContext.create(); .... httpclient.execute(httpPut, context); 

有关更多信息重新配置超时配置, 这里有一个很好的解释。

她是我遇到的connection eviction policy : 在这里

 public static class IdleConnectionMonitorThread extends Thread { private final HttpClientConnectionManager connMgr; private volatile boolean shutdown; public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) { super(); this.connMgr = connMgr; } @Override public void run() { try { while (!shutdown) { synchronized (this) { wait(5000); // Close expired connections connMgr.closeExpiredConnections(); // Optionally, close connections // that have been idle longer than 30 sec connMgr.closeIdleConnections(30, TimeUnit.SECONDS); } } } catch (InterruptedException ex) { // terminate } } public void shutdown() { shutdown = true; synchronized (this) { notifyAll(); } }} 

我想你可能想看看这个。