【问题标题】:How to retrieve more than 100 messages from the history of a PubNub channel?如何从 PubNub 频道的历史记录中检索超过 100 条消息?
【发布时间】:2014-01-07 16:11:14
【问题描述】:

关于PubNub History API 的页面指出

history() 函数返回最多 100 条消息的列表,开始 时间令牌和结束时间令牌。

有没有办法检索超过 100 条消息?

我目前不是 PubNub 的付费客户。

【问题讨论】:

  • 你用什么语言编程?
  • 有几个选项。你想要一个完整的历史转储吗?还是您只想要过去 24 小时的快照?
  • 我继续用 JavaScript 给你两个答案。
  • @PubNub,非常感谢您的全面回答!这正是我所需要的。
  • 不客气!

标签: history pubnub


【解决方案1】:

PubNub 加载历史超过 100 条消息

有时您希望在线性数据流上及时回切。通常您会希望在不同的粒度级别上执行此操作。这就是 PubNub 存储和播放 API 提供最大灵活性的原因。然而,有时使用首选结果集加载数据最终会有点棘手。

PubNub Real-Time Network Storage and Playback

在跨时间线加载交易历史记录时,您可能需要考虑几个因素,这些时间轴可能跨越交易集中的数百万条消息。有一些不错的选择可供您使用,我们现在将介绍其中的两个。这些示例将使用 JavaScript 编码。第一个示例通过抓取过去 24 小时内每个小时开始的快照来加载数据摘要。第二个示例向您展示了如何加载所有交易的完整细节和最大粒度。

所有参考文件都可以在这个GIST: Loading History from PubNub Mt.Gox Trades找到

PubNub Mt.Gox 历史 JavaScript 用法示例

<script src="https://cdn.pubnub.com/pubnub.min.js"></script>
<script src="mtgox-history.js"></script>
<script>(function(){

// LOAD HOURLY SUMMARY
MTGOX.history.hourly({
    channel : 'd5f06780-30a8-4a48-a2f8-7ed181b4a13f',
    data    : function(response) { console.log(JSON.stringify(response)) },
    error   : function()         { console.log("NETWORK ERROR")  } 
});

// LOAD ALL WITH LIMITER OPTION
MTGOX.history.full({
    limit   : 500, // SET LIMIT AS HIGH AS NEEDED TO LOAD MORE!
    channel : 'd5f06780-30a8-4a48-a2f8-7ed181b4a13f',
    data    : function(messages) { console.log(messages)        },
    error   : function(e)        { console.log("NETWORK ERROR") }
});

})();</script>

注意:运行MTGOX.history.hourly() 方法将生成过去24 小时内每小时的快照列表。

注意:运行MTGOX.history.full() 方法将生成包含大量数据的最高分辨率细节。您可以根据需要获取完整转储或部分转储;并且您应该增加limit 参数以获取更多数据点。

以下 JavaScript 文件将为您提供MTGOX 接口。

PubNub Mt.Gox 历史 JavaScript 加载器

//
// mtgox-history.js
//

(function(){

// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
// INITIALIZE PUBNUB
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
var pubnub = PUBNUB.init({
    subscribe_key : 'sub-c-50d56e1e-2fd9-11e3-a041-02ee2ddab7fe'
});

// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
// MTGOX HISTORY INTERFACE
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
window.MTGOX = {
    history : {
        hourly : hourly,
        full   : full
    }
};

// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
// GET ALL DATA FOREVER (WITH LIMIT OF COURSE)
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

/*
MTGOX.history.full({
    limit   : 1000,
    channel : 'd5f06780-30a8-4a48-a2f8-7ed181b4a13f',
    data    : function(messages) { console.log(messages)        },
    error   : function(e)        { console.log("NETWORK ERROR") }
});
*/

function full(args) {
    var chan     = args['channel'] ||'d5f06780-30a8-4a48-a2f8-7ed181b4a13f'
    ,   callback = args['data']   || function(){}
    ,   error    = args['error']  || function(){}
    ,   limit    = +args['limit'] || 5000
    ,   start    = 0
    ,   count    = 100
    ,   history  = []
    ,   params   = {
            channel  : chan,
            count    : count,
            callback : function(messages) {
                var msgs = messages[0];
                start = messages[1];
                params.start = start;
                PUBNUB.each( msgs.reverse(), function(m) {history.push(m)} );

                if (history.length >= limit) return callback(history);
                if (msgs.length < count)     return callback(history);

                count = 100;
                add_messages();
            },
            error : function(e) {
                callback(history);
                error(history);
            }
        };

    add_messages();
    function add_messages() { pubnub.history(params) }
}

// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
// GET 24 HOURS IN HOURLY INCREMENTS
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

/*
MTGOX.history.hourly({
    channel : 'd5f06780-30a8-4a48-a2f8-7ed181b4a13f',
    data    : function(response) { console.log(response) },
    error   : function()         { console.log('ERROR')  } 
});
*/

function hourly(setup) {
    var limit = 24;
    var count = 0;
    var chan  = setup['channel'] ||'d5f06780-30a8-4a48-a2f8-7ed181b4a13f';
    var cb    = setup['data']    || function(){};
    var eb    = setup['error']   || function(){};
    var now   = new Date();

    now.setUTCHours(0);
    now.setUTCMinutes(0);
    now.setUTCSeconds(0);
    now.setUTCMilliseconds(0);

    var utc_now = now.getTime();
    var vectors = [];

    PUBNUB.each( (new Array(limit)).join(',').split(','), function( _, d ) {
        var day = utc_now - 3600000 * d;
        pubnub.history({
            limit    : 1,
            channel  : chan,
            start    : day * 10000,
            error    : function() { count++; eb(); },
            callback : function(messages) {
                // DONE?
                if (++count == limit) return cb(vectors);

                // ADD TIME SLICES
                var res = +(((messages[0][0]||{}).ticker||{}).avg||{}).value;
                res && vectors.push([ new Date(day).getUTCHours(), res ]);

                // KEEP IT SORTED
                vectors.sort(function(a,b){ return a[0] > b[0] && -1 || 1 });
            }
        })
    } );
}

})();

用于代码、深度和交易的 Mt.Gox PubNub 频道列表

以下是 Mt.Gox 数据馈送选项提供的频道列表,您可以在历史channel 参数字段中使用。

{
"TICKER.ltcgbp": "0102A446-E4D4-4082-8E83-CC02822F9172",
"TICKER.ltccny": "0290378C-E3D7-4836-8CB1-2BFAE20CC492",
"DEPTH.btchkd": "049F65DC-3AF3-4FFD-85A5-AAC102B2A579",
"DEPTH.btceur": "057BDC6B-9F9C-44E4-BC1A-363E4443CE87",
"TICKER.nmcaud": "08C65460-CBD9-492E-8473-8507DFA66AE6",
"TICKER.btceur": "0BB6DA8B-F6C6-4ECF-8F0D-A544AD948C15",
"DEPTH.btckrw": "0C84BDA7-E613-4B19-AE2A-6D26412C9F70",
"DEPTH.btccny": "0D1ECAD8-E20F-459E-8BED-0BDCF927820F",
"TICKER.btccad": "10720792-084D-45BA-92E3-CF44D9477775",
"DEPTH.btcchf": "113FEC5F-294D-4929-86EB-8CA4C3FD1BED",
"TICKER.ltcnok": "13616AE8-9268-4A43-BDF7-6B8D1AC814A2",
"TICKER.ltcusd": "1366A9F3-92EB-4C6C-9CCC-492A959ECA94",
"TICKER.btcbtc": "13EDFF67-CFA0-4D99-AA76-52BD15D6A058",
"TICKER.ltccad": "18B55737-3F5C-4583-AF63-6EB3951EAD72",
"TICKER.nmccny": "249FDEFD-C6EB-4802-9F54-064BC83908AA",
"DEPTH.btcusd": "24E67E0D-1CAD-4CC0-9E7A-F8523EF460FE",
"TICKER.btcchf": "2644C164-3DB7-4475-8B45-C7042EFE3413",
"DEPTH.btcaud": "296EE352-DD5D-46F3-9BEA-5E39DEDE2005",
"TICKER.btcczk": "2A968B7F-6638-40BA-95E7-7284B3196D52",
"TICKER.btcsgd": "2CB73ED1-07F4-45E0-8918-BCBFDA658912",
"TICKER.nmcjpy": "314E2B7A-A9FA-4249-BC46-B7F662ECBC3A",
"TICKER.btcnmc": "36189B8C-CFFA-40D2-B205-FB71420387AE",
"DEPTH.btcinr": "414FDB18-8F70-471C-A9DF-B3C2740727EA",
"DEPTH.btcsgd": "41E5C243-3D44-4FAD-B690-F39E1DBB86A8",
"TICKER.btcltc": "48B6886F-49C0-4614-B647-BA5369B449A9",
"TICKER.ltceur": "491BC9BB-7CD8-4719-A9E8-16DAD802FFAC",
"TICKER.btcinr": "55E5FEB8-FEA5-416B-88FA-40211541DECA",
"TICKER.ltcjpy": "5AD8E40F-6DF3-489F-9CF1-AF28426A50CF",
"DEPTH.btccad": "5B234CC3-A7C1-47CE-854F-27AEE4CDBDA5",
"TICKER.btcnzd": "5DDD27CA-2466-4D1A-8961-615DEDB68BF1",
"DEPTH.btcgbp": "60C3AF1B-5D40-4D0E-B9FC-CCAB433D2E9C",
"DEPTH.btcnok": "66DA7FB4-6B0C-4A10-9CB7-E2944E046EB5",
"DEPTH.btcthb": "67879668-532F-41F9-8EB0-55E7593A5AB8",
"TICKER.btcsek": "6CAF1244-655B-460F-BEAF-5C56D1F4BEA7",
"TICKER.btcnok": "7532E866-3A03-4514-A4B1-6F86E3A8DC11",
"TICKER.btcgbp": "7B842B7D-D1F9-46FA-A49C-C12F1AD5A533",
"TRADE.LAG": "85174711-BE64-4DE1-B783-0628995D7914",
"DEPTH.btcsek": "8F1FEFAA-7C55-4420-ADA0-4DE15C1C38F3",
"DEPTH.btcdkk": "9219ABB0-B50C-4007-B4D2-51D1711AB19C",
"DEPTH.btcjpy": "94483E07-D797-4DD4-BC72-DC98F1FD39E3",
"TICKER.nmcusd": "9AAEFD15-D101-49F3-A2FD-6B63B85B6BED",
"TICKER.ltcaud": "A046600A-A06C-4EBF-9FFB-BDC8157227E8",
"TICKER.btcjpy": "A39AE532-6A3C-4835-AF8C-DDA54CB4874E",
"DEPTH.btcczk": "A7A970CF-4F6C-4D85-A74E-AC0979049B87",
"TICKER.ltcdkk": "B10A706E-E8C7-4EA8-9148-669F86930B36",
"TICKER.btcpln": "B4A02CB3-2E2D-4A88-AEEA-3C66CB604D01",
"TEST": "BAD99F24-FA8B-4938-BFDF-0C1831FC6665",
"TICKER.btcrub": "BD04F720-3C70-4DCE-AE71-2422AB862C65",
"TICKER.nmcgbp": "BF5126BA-5187-456F-8AE6-963678D0607F",
"TICKER.btckrw": "BF85048D-4DB9-4DBE-9CA3-5B83A1A4186E",
"TICKER.btccny": "C251EC35-56F9-40AB-A4F6-13325C349DE4",
"DEPTH.btcnzd": "CEDF8730-BCE6-4278-B6FE-9BEE42930E95",
"TICKER.btchkd": "D3AE78DD-01DD-4074-88A7-B8AA03CD28DD",
"TICKER.btcthb": "D58E3B69-9560-4B9E-8C58-B5C0F3FDA5E1",
"TICKER.btcusd": "D5F06780-30A8-4A48-A2F8-7ED181B4A13F",
"DEPTH.btcrub": "D6412CA0-B686-464C-891A-D1BA3943F3C6",
"TICKER.nmceur": "D8512D04-F262-4A14-82F2-8E5C96C15E68",
"TRADE.btc": "DBF1DEE9-4F2E-4A08-8CB7-748919A71B21",
"TICKER.nmccad": "DC28033E-7506-484C-905D-1C811A613323",
"DEPTH.btcpln": "E4FF055A-F8BF-407E-AF76-676CAD319A21",
"TICKER.btcdkk": "E5CE0604-574A-4059-9493-80AF46C776B3",
"TICKER.btcaud": "EB6AAA11-99D0-4F64-9E8C-1140872A423D"
}

【讨论】:

  • 我试图在 Java 中做到这一点。通过您的 javascript 示例,我发现了我的问题。我必须将毫秒时间转换为 10 微秒时间:long pubnubTime = standardJavaMillisTime * 10000。如果您在 Pubnub.history(final String channel, long start, long end, int count, boolean reverse, final Callback callback) 方法的文档中包含此信息,可能会为 Java 开发人员节省一些时间。
  • 很好,丹尼尔,我已将其添加到我们的待办事项列表中。
【解决方案2】:

https://help.pubnub.com/entries/24113341-How-do-I-Page-Through-Stored-Messages- 如果需要进一步的帮助,请联系 PubNub 支持 (help@...)

【讨论】:

    【解决方案3】:

    下面的 Java 类可用于在这种形式的高级 for 循环中轻松检索和处理长时间范围的历史消息:

    PubnubHistoryExcerpt history = new PubnubHistoryExcerpt(pubnub, channel, start, end);
    for (Object message : history) {
        // do something with the message object
    }
    

    消息是即时检索的,因此不会出现内存问题。

    下面是完整的代码。您可以在类内部的main() 方法中找到完全可运行的使用示例。

    我还没有对这个类进行广泛的测试。欢迎改进。

    /*
     * PubnubHistoryExcerpt.java
     * 
     * This file is distributed under the FreeBSD License:
     * 
     * Copyright (c) 2014, Daniel S. (http://stackoverflow.com/users/1838726/daniel-s)
     * All rights reserved.
     * 
     * Redistribution and use in source and binary forms, with or without
     * modification, are permitted provided that the following conditions are met: 
     * 
     * 1. Redistributions of source code must retain the above copyright notice, this
     *    list of conditions and the following disclaimer. 
     * 2. Redistributions in binary form must reproduce the above copyright notice,
     *    this list of conditions and the following disclaimer in the documentation
     *    and/or other materials provided with the distribution. 
     * 
     * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
     * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
     * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
     * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
     * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
     * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
     * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
     * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     * 
     * The views and conclusions contained in the software and documentation are those
     * of the authors and should not be interpreted as representing official policies, 
     * either expressed or implied, of the FreeBSD Project.
     */
    
    import java.math.BigDecimal;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.NoSuchElementException;
    
    import org.json.JSONArray;
    import org.json.JSONException;
    import org.json.JSONObject;
    
    import com.pubnub.api.Callback;
    import com.pubnub.api.Pubnub;
    import com.pubnub.api.PubnubError;
    
    /**
     * You can use this class to iterate over historical PubNub messages. The messages are retrieved transparently while you
     * iterate over the history excerpt. This class and the returned iterators are thread-safe.
     * 
     * See {@link #main(String[])} for a usage example.
     */
    public class PubnubHistoryExcerpt implements Iterable<Object> {
    
        /**
         * This main method contains a usage example for this class. It downloads last 3 hour's messages of the MtGox BTC/USD ticker
         * channel from PubNub and outputs the timestamp and USD value which are found in the messages.
         */
        public static void main(String[] args) throws JSONException {
            String PUBNUB_SUBSCRIBE_KEY_MTGOX = "sub-c-50d56e1e-2fd9-11e3-a041-02ee2ddab7fe";
            String PUBNUB_CHANNEL_MTGOX_TICKER_BTCUSD = "d5f06780-30a8-4a48-a2f8-7ed181b4a13f";
    
            Pubnub pubnub = new Pubnub(null, PUBNUB_SUBSCRIBE_KEY_MTGOX);
    
            long ONE_HOUR_IN_MILLIS = 60 * 60 * 1000;
    
            long end = System.currentTimeMillis();
            long start = end - 3 * ONE_HOUR_IN_MILLIS;
    
            // convert from milliseconds as time unit (10^-3 seconds) to
            // pubnub's better-than-microsecond precision time units (10^-7 seconds)
            start *= 10000;
            end *= 10000;
    
            PubnubHistoryExcerpt history = new PubnubHistoryExcerpt(pubnub, PUBNUB_CHANNEL_MTGOX_TICKER_BTCUSD, start, end);
    
            DefaultDateFormat dateFormat = DefaultDateFormat.create();
    
            for (Object message : history) {
                JSONObject messageJson = (JSONObject) message;
    
                JSONObject ticker = messageJson.getJSONObject("ticker");
                long instant = ticker.getLong("now");
                BigDecimal value = new BigDecimal(ticker.getJSONObject("last_local").getString("value"));
    
                instant /= 1000; // convert from microseconds to milliseconds
    
                System.out.println(dateFormat.format(instant) + ": " + value);
            }
    
            System.exit(0);
        }
    
        /**
         * This is the maximum number of messages to fetch in one batch. If you fetch many messages, higher numbers improve
         * performance. Setting this to a value higher than 100 doesn't have an effect, because Pubnub currently doesn't
         * support fetching more than this many messages at once.
         */
        private static final int BATCH_SIZE = 100;
    
        private final Pubnub pubnub;
    
        private final String channel;
    
        private final long start;
    
        private final long end;
    
        /**
         * Constructs a new excerpt over which you can iterate. Insances represent an excerpt. No retrieval operations are
         * started unless you call iterator().next() for the first time.
         * 
         * @param pubnub
         *            The Pubnub connection to use for retrieving messages.
         * @param channel
         *            The channel for which to retrieve historical messages.
         * @param start
         *            The beginning of the time interval for which to retrieve messages, in pubnub's time units (10^-7
         *            seconds, so milliseconds * 10000) since 1970-01-01 00:00:00).
         * @param end
         *            The end of the time interval for which to retrieve messages, in pubnub's time units (10^-7 seconds, so
         *            milliseconds * 10000) since 1970-01-01 00:00:00).
         */
        private PubnubHistoryExcerpt(Pubnub pubnub, String channel, long start, long end) {
            this.pubnub = pubnub;
            this.channel = channel;
            this.start = start;
            this.end = end;
        }
    
        public Iterator<Object> iterator() {
            return new Iter();
        }
    
        private class Iter implements Iterator<Object> {
    
            /**
             * This list is used as a fifo buffer for messages retrieves through this iterator. It also acts as the main
             * synchronization lock for synchronizing access between threads accessing this class as an iterator and threads
             * calling back from the Pubnub API.
             */
            private LinkedList<Object> buffer = new LinkedList<Object>();
    
            /**
             * This field stores the end of the time range of the previous batch retrieval, in Pubnub time units (10th of a
             * microsecond, so milliseconds*10000). For the following batch retrieval, this is used as the start time for
             * retrieving the following messages.
             */
            private long prevBatchTimeRangeEnd = PubnubHistoryExcerpt.this.start;
    
            /**
             * Retrieval of messages is handled asynchronously. That means that exceptions which are thrown during retrieval
             * can't automatically be propagated through to the code which invokes <code>next()</code> or
             * <code>hasNext()</code> . Therefor, such an exception is stored temporarily in this field and then re-thrown
             * from within <code>next()</code> or <code>hasNext()</code>.
             */
            private Exception caughtDuringRetrieval = null;
    
            /**
             * This object is used to wait on and to notify about updates of the buffer.
             */
            private Object notifier = new Object();
    
            /**
             * Because of spurious wakeups that can happen during wait(), this field is necessary to tell the waiting thread
             * if retrieval is still running.
             */
            private boolean retrieving = false;
    
            /**
             * The callback object to use for retrieving messages. This is stored in a field here for re-use. This is a
             * compromise between performance and low memory footprint, slightly in favor of performance.
             */
            private InternalCallback internalCallback = new InternalCallback();
    
            private void retrieveNextBatch() {
                synchronized (notifier) {
                    this.retrieving = true;
    
                    // String startStr = DefaultDateFormat.create().format(prevBatchTimeRangeEnd / 10000);
                    // String endStr = DefaultDateFormat.create().format(end / 10000);
                    // System.out.println("fetching from " + startStr + " till " + endStr);
    
                    if (Iter.this.prevBatchTimeRangeEnd < PubnubHistoryExcerpt.this.end) {
                        PubnubHistoryExcerpt.this.pubnub.history( //
                                PubnubHistoryExcerpt.this.channel, //
                                Iter.this.prevBatchTimeRangeEnd, //
                                PubnubHistoryExcerpt.this.end, //
                                BATCH_SIZE, //
                                false, //
                                Iter.this.internalCallback //
                                );
    
                        waitUntilNextBatchRetrievalFinished();
                    }
                }
            }
    
            private void waitUntilNextBatchRetrievalFinished() {
                while (this.retrieving) {
                    try {
                        this.notifier.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            private class InternalCallback extends Callback {
    
                @Override
                public void successCallback(String channel, Object message) {
                    synchronized (Iter.this.notifier) {
                        try {
                            processSuccessCallback(channel, message);
                        } catch (Exception e) {
                            Iter.this.caughtDuringRetrieval = e;
                        } finally {
                            Iter.this.retrieving = false;
                            Iter.this.notifier.notifyAll();
                        }
                    }
                }
    
                @Override
                public void errorCallback(String channel, PubnubError error) {
                    Iter.this.caughtDuringRetrieval = new Exception("" + //
                            error.getClass().getName() + ": " + //
                            error.getErrorString() + //
                            " (code=" + error.errorCode + "; extendedCode=" + error.errorCodeExtended + ")");
    
                    Iter.this.caughtDuringRetrieval.fillInStackTrace();
                }
    
            }
    
            private void processSuccessCallback(String channel, Object message) throws JSONException {
                if (message == null)
                    throw new NullPointerException("retrieved message is null");
    
                if (!(message instanceof JSONArray))
                    throw new RuntimeException("retrieved message is not a " + JSONArray.class.getName());
    
                JSONArray historyMessage = (JSONArray) message;
    
                // System.out.println(historyMessage.toString(2));
    
                JSONArray messageList = extractMessageList(historyMessage);
    
                long batchTimeRangeEnd = extractBatchTimeRangeEnd(historyMessage);
                if (batchTimeRangeEnd > 0)
                    Iter.this.prevBatchTimeRangeEnd = batchTimeRangeEnd;
                else
                    Iter.this.prevBatchTimeRangeEnd = end;
    
                processMessageList(messageList);
            }
    
            private void processMessageList(JSONArray messageList) {
                int i = 0;
    
                for (; i < messageList.length(); i++) {
                    JSONObject message;
    
                    try {
                        message = messageList.getJSONObject(i);
                    } catch (JSONException e) {
                        String str;
                        try {
                            str = messageList.toString(2);
                        } catch (JSONException secondaryE) {
                            str = "(couldn't convert messageList to String because of " + secondaryE.toString() + ")";
                        }
                        throw new RuntimeException("couldn't extract message at index " + i + " from messageList (messageList:\n" + str
                                + "\n(end of messageList)\n)", e);
                    }
    
                    Iter.this.buffer.add(message);
                }
            }
    
            private long extractBatchTimeRangeEnd(JSONArray historyMessage) {
                long batchTimeRangeEnd;
                try {
                    batchTimeRangeEnd = historyMessage.getLong(2);
                } catch (JSONException e) {
                    String str = safeConvertHistoryMessageToString(historyMessage);
                    throw new RuntimeException("could not extract element 2 (batchTimeRangeEnd) of retrieved historyMessage (historyMessage:\n" + str
                            + "\n(end of historyMessage)\n)", e);
                }
                return batchTimeRangeEnd;
            }
    
            private String safeConvertHistoryMessageToString(JSONArray historyMessage) {
                String str;
                try {
                    str = historyMessage.toString(2);
                } catch (JSONException secondaryE) {
                    str = "(couldn't convert historyMessage to String because of " + secondaryE.toString() + ")";
                }
                return str;
            }
    
            private JSONArray extractMessageList(JSONArray historyMessage) {
                JSONArray messageArJson;
                try {
                    messageArJson = historyMessage.getJSONArray(0);
                } catch (JSONException e) {
                    String str = safeConvertHistoryMessageToString(historyMessage);
                    throw new RuntimeException("could not extract element 0 (messageList) of retrieved historyMessage (historyMessage:\n" + str
                            + "\n(end of historyMessage)\n)", e);
                }
                return messageArJson;
            }
    
            public boolean hasNext() {
                synchronized (Iter.this.buffer) {
                    ensureNotInExceptionState();
    
                    if (Iter.this.buffer.isEmpty())
                        retrieveNextBatch();
    
                    return !Iter.this.buffer.isEmpty();
                }
            }
    
            public Object next() {
                synchronized (Iter.this.buffer) {
                    if (!hasNext()) {
                        throw new NoSuchElementException("there are no more elements in this iterator");
                    }
    
                    Object result = Iter.this.buffer.removeFirst();
    
                    return result;
                }
            }
    
            private void ensureNotInExceptionState() {
                if (caughtDuringRetrieval != null) {
                    throw new RuntimeException("an exception was caught already by a previous attempt to access this iterator", caughtDuringRetrieval);
                }
            }
    
            public void remove() {
                throw new UnsupportedOperationException(getClass().getName() + " doesn't support remove()");
            }
        }
    }
    

    【讨论】:

    • 不应该 errorCallback 也 notifier.notifyAll() 以释放正在等待 notifier 的其他线程吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-04
    • 2022-10-25
    相关资源
    最近更新 更多