【问题标题】:Parsing DateTime String from CSV File As Event Timestamp将 CSV 文件中的 DateTime 字符串解析为事件时间戳
【发布时间】:2020-04-17 20:05:05
【问题描述】:

使用 WSO2 SP,我的应用从以下 CSV 文件中读取行:

20170801 000001237,1.321420,1.321510,0
20170801 000001487,1.321440,1.321530,0
20170801 000001737,1.321450,1.321530,0
20170801 000001987,1.321440,1.321530,0

第一列是时间戳字符串,需要对其进行解析以对事件进行时间戳:

yyyyMMdd hhmmssfff

,其中 fff 是毫秒

我当前的应用如下所示:

@App:name('ReceiveAndCount')
@App:description('count events in csv file')
@source(type = 'file', 
    mode='line',
    tailing='false',
    file.uri = "file:/Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", 
    action.after.process='NONE',
        @map(type = 'csv', header='false', delimiter = ",",
        @attributes(
        dateTime = '0',
        bid='1',
        ask='2',
        ignore='3'  ) ))

define stream csvGBPUSDstream (dateTime string, bid double, ask double, ignore int);

@sink(type = 'log', priority='info')
define stream TotalCountStream (totalCount long);

-- Count the incoming events
@info(name = 'query1')
from csvGBPUSDstream 
select count() as totalCount
insert into TotalCountStream;

非常感谢任何关于将第一列中的日期字符串解析为事件时间戳的指导

【问题讨论】:

  • 使用以下内容,我解析了 dateTime 字符串,但是解析产生的时间戳被视为数据值,而不是计算时间窗口所需的时间索引。 Siddhi 附上一个事件时间戳。如何让 Siddhi 使用解析的时间戳作为索引? , . @info(name = 'Parse Timestamp') from csvGBPUSDstream select time:timestampInMilliseconds(dateTime,'yyyyMMdd HHmmssSSS') 作为时间戳,bid,ask 插入 indexedGBPUSDstream;

标签: csv timestamp etl wso2sp


【解决方案1】:

答案在于使用#window.externalTime() 函数

@App:name('CsvParseDateTime')

@App:description('从 csv 文件中解析时间戳字符串')

@source(type = 'file', mode = "line", tailing = "false", file.uri = "file:/Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708. csv", action.after.process = "NONE", @map(type = 'csv', header = "false", delimiter = ",", @attributes(dateTime = "0", ask = "2", ignore = "3", bid = "1"))) 定义流 csvGBPUSDstream(dateTime 字符串,bid double,ask double,忽略 int);

@sink(type = 'log', 优先级 = "info") 定义流fiveSecRangeStream(timestamp long,fiveSecRange double);

-- streat 将 dateTime 字符串解析为可以用作时间窗口索引的毫秒时间 --time:timestampInMilliseconds(date.value,date.format) yyyy-MM-dd HH:mm:ss.SSS @info(name = '解析时间戳') 来自 csvGBPUSDstream 选择时间:timestampInMilliseconds(dateTime, 'yyyyMMdd HHmmssSSS') 作为时间戳,出价,询价 插入 indexedGBPUSD 流;

-- 外部窗口时间(事件流中的时间戳) 来自 indexedGBPUSDstream#window.externalTime(timestamp,5 Sec) 选择时间戳,max(bid)-min(bid) 作为 FiveSecRange 插入到 FiveSecRangeStream; -- 适用于所有事件;

【讨论】:

    【解决方案2】:

    通过以下命令在 docker 中运行 wso2sp-editor,使用音量开关允许读取本地数据文件:

    docker run -it -p 9390:9390 -v /Users/A/Desktop/wso2sp-editor-docker-data:/home/wso2carbon/wso2sp-4.4.0/data/ --name editor wso2/wso2sp-editor:4.4.0
    

    最后是应用代码:

     @App:name('CsvParseDateTime')
    

    @App:description('从 csv 文件中解析时间戳字符串') --abrakadabra @source(type = 'file', mode = "line", tailing = "false", file.uri = "file://home/wso2carbon/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", --"文件:/Users/A/Desktop/siddhi/wso2sp-4.4.0/data/DAT_ASCII_GBPUSD_T_201708.csv", action.after.process = "NONE", @map(type = 'csv', header = "false", delimiter = ",", @attributes(dateTime = "0", ask = "2", ignore = "3", bid = "1"))) 定义流 csvGBPUSDstream(dateTime 字符串,bid double,ask double,忽略 int);

    @sink(type = 'log', 优先级 = "info") 定义流fiveSecRangeStream(timestamp long,fiveSecRange double);

    -- streat 将 dateTime 字符串解析为可以用作时间窗口索引的毫秒时间 --time:timestampInMilliseconds(date.value,date.format) yyyy-MM-dd HH:mm:ss.SSS @info(name = '解析时间戳') 来自 csvGBPUSDstream 选择时间:timestampInMilliseconds(dateTime, 'yyyyMMdd HHmmssSSS') 作为时间戳,出价,询价 插入 indexedGBPUSD 流;

    -- 外部窗口时间(事件流中的时间戳) 来自 indexedGBPUSDstream#window.externalTime(timestamp,5 Sec) 选择时间戳,max(bid)-min(bid) 作为 FiveSecRange 插入到 FiveSecRangeStream; -- 适用于所有事件;

    【讨论】:

      猜你喜欢
      • 2015-04-02
      • 1970-01-01
      • 2021-08-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多