如果 Http 连接失败(例如服务器关闭),我需要减少计数器。
我本来想说“是的”,但在这句话之后我有点不太确定。我认为你想做这样的事情:
定义发送请求(网址)
request = 对 url 的新请求
request.header["X-count"] = 下一个序列
如果 request.send() != 成功
倒带序列
在这种情况下,我猜不应该允许两个线程同时发送请求,然后你想要序列化请求的东西而不是AtomicInteger,它实际上只是让你原子地执行一些操作。如果两个线程同时调用sendRequest,而第一个线程失败,就会发生这种情况:
线程 |发生什么了?
--------+-------------
一个 |创建新请求
乙|创建新请求
一个 |设置请求["X-count"] = 0
一个 |将计数器递增到 1
一个 |发送请求
乙|设置请求["X-count"] = 1
乙|将计数器递增到 2
乙|发送请求
一个 |请求失败
乙|请求成功
一个 |倒回计数器到 1
C |创建新请求
C |设置请求["X-count"] = 1
C |将计数器递增到 2
现在,您已经发送了两个 X-count = 1 的请求。如果您想避免这种情况,您应该使用类似的东西(假设 Request 和 Response 是用于处理对 URL 的请求的类):
class SerialRequester {
private volatile int currentSerial = 0;
public synchronized Response sendRequest(URL url) throws SomeException {
Request request = new Request(url);
request.setHeader("X-count", currentSerial);
Response response = request.send();
if (response.isSuccess()) ++currentSerial;
return response;
}
}
该类保证没有两个成功的请求(通过相同的SerialRequester 发出)具有相同的 X-count 值。
编辑 许多人似乎担心上述解决方案不能同时运行。它没有。这是正确的。但它需要以这种方式解决 OP 的问题。现在,如果请求失败时计数器不需要递减,AtomicInteger 将是完美的,但在这种情况下是不正确的。
编辑 2 我得到它来编写一个不太容易冻结的串行请求程序(如上面的那个),这样如果请求等待时间过长(即排队),它就会中止请求在工作线程中但未启动)。因此,如果管道阻塞并且一个请求挂起很长时间,其他请求将最多等待固定的时间,因此队列不会无限增长,直到阻塞消失。
class SerialRequester {
private enum State { PENDING, STARTED, ABORTED }
private final ExecutorService executor =
Executors.newSingleThreadExecutor();
private int currentSerial = 0; // not volatile, used from executor thread only
public Response sendRequest(final URL url)
throws SomeException, InterruptedException {
final AtomicReference<State> state =
new AtomicReference<State>(State.PENDING);
Future<Response> result = executor.submit(new Callable<Response>(){
@Override
public Result call() throws SomeException {
if (!state.compareAndSet(State.PENDING, State.STARTED))
return null; // Aborted by calling thread
Request request = new Request(url);
request.setHeader("X-count", currentSerial);
Response response = request.send();
if (response.isSuccess()) ++currentSerial;
return response;
}
});
try {
try {
// Wait at most 30 secs for request to start
return result.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e){
// 30 secs passed; abort task if not started
if (state.compareAndSet(State.PENDING, State.ABORTED))
throw new SomeException("Request queued too long", e);
return result.get(); // Task started; wait for completion
}
} catch (ExecutionException e) { // Network timeout, misc I/O errors etc
throw new SomeException("Request error", e);
}
}
}