【发布时间】:2019-10-14 06:05:44
【问题描述】:
我编写了一个 Python 3.7 脚本,该脚本异步地 (asyncio 3.4.3 and aiohttp 3.5.4) 创建一个 Salesforce 批量 API (v45.0) 作业/批处理,使用由单个 SOQL 语句每个查询的多个对象,等待批处理完成,完成后将结果下载(流式传输)到服务器,进行一些数据转换,最后将结果同步上传到SQL Server 2016 SP1 (13.0.4560.0)。我已经进行了很多成功的试运行,并认为它运行良好,但是,我最近开始间歇性地收到以下错误,并且对如何修复感到不知所措,因为关于此的报告/解决方案很少在网络上:
aiohttp.client_exceptions.ClientPayloadError:响应负载不是 完成
示例代码sn-p:
import asyncio,aiohttp,aiofiles
from simple_salesforce import Salesforce
from xml.etree import ElementTree
#Establish a session using the simple_salesforce module
sf = Salesforce(username=username,
password=password,
security_token=securityToken,
organizationId=organizationId)
sfAPIURL = 'https://myinstance.salesforce.com/services/async/45.0/job/'
sfDataPath = 'C:/Salesforce/Data/'
#Dictionary to store information for the object/job/batch while the script is executing
objectDictionary =
{'Account': {'job':
{'batch': {'id': '8596P00000ihwpJulI','results': ['8596V00000Bo9iU'],'state': 'Completed'},
'id': '8752R00000iUjtReqS'},
'soql': 'select Id,Name from Account'},
'Contact': {'job':
{'batch': {'id': '9874G00000iJnBbVgg','results': ['7410t00000Ao9vp'],'state': 'Completed'},
'id': '8800o00000POIkLlLa'},
'soql': 'select Id,Name from Contact'}}
async def retrieveResults(jobId, batchId, sfObject):
headers = {"X-SFDC-Session": sf.session_id, 'Content-Encoding': 'gzip'}
async with aiohttp.ClientSession() as session:
async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result', headers=headers) as r:
data = await r.text()
batchResults = ElementTree.fromstring(data) #list of batch results
for resultID in batchResults:
async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text}', headers=headers, timeout=None) as r:
async with aiofiles.open(f'{sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv', 'wb') as outfile: #save in temporary file for manipulation later
while True:
chunk = await r.content.read(81920)
if not chunk:
break
await outfile.write(chunk)
async def asyncDownload():
await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])
if __name__ == "__main__":
asyncio.run(asyncDownload())
回溯(错误行与上面的代码 sn-p 不匹配):
Traceback(最近一次调用最后一次):
文件“C:\Code\salesforce.py”,第 252 行,在 asyncio.run(asyncDownload())
文件“C:\Program Files\Python37\lib\asyncio\runners.py”,第 43 行,在 跑步 返回 loop.run_until_complete(main)
文件“C:\Program Files\Python37\lib\asyncio\base_events.py”,行 584,在 run_until_complete 返回future.result()
文件“C:\Code\salesforce.py”,第 241 行,在 asyncDownload 中 等待 asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) 为 sfObject in objectDictionary])
文件“C:\Code\salesforce.py”,第 183 行,在 检索结果 块 = 等待 r.content.read(81920)
文件 "C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py",第 369 行,在 读 等待 self._wait('read')
文件 "C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py",第 297 行,在 _等待 等待服务员
aiohttp.client_exceptions.ClientPayloadError:响应负载不是 完成
问题的根源似乎始于 r.content.read(81920),它应该是 81920 字节块中的流式数据,但这是我所能得到的。
我认为这不是我的网络问题,因为有其他小型作业连接到此服务器上的外部源,在此作业运行时完成没有问题。有谁知道这里发生了什么?
谢谢!
-编辑:
我尝试了iter_any() 而不是read(),但仍然遇到相同的错误...
async for data in r.content.iter_any():
await outfile.write(data)
我尝试了readline() 仍然遇到同样的错误...
async for line in r.content.readline():
await outfile.write(line)
从那以后,我在代码的错误处理部分(不包括在原始问题中)中处理了一些重试功能,最终使作业得以完成。有效负载错误仍在发生,这仍然是主要问题,但重试下载是一个成功的解决方法。如果有人能够提供更多信息,问题仍然存在。
【问题讨论】:
-
SF 端的作业完成了吗?您可以在设置 -> 批量数据加载作业中看到它?
-
@eyescream 是的,作业完成没有问题,“已完成”批处理状态是我开始
retrieveResults()函数的触发器。 -
不知道,抱歉。如果您从 Postman、curl、SF Workbench 发出相同的请求怎么办?如果它似乎在沙盒中运行良好,但在生产/开发人员版本中死机 - 也许您正在耗尽滚动的 24 小时 API 请求限制? (理想情况下,您会看到有关它的 HTTP 标头)
-
我应该补充一点,我相信这会在数据流式传输到磁盘时发生(
while True:部分),因为将开始 200Mb 下载,然后在下载过程中随机出现错误,但并非总是如此.我知道我没有用尽 API 限制的事实——我经常关注它并且始终低于 5% 的使用率。我将尝试常规的requests看看我是否至少可以完成我的下载,只是不喜欢失去异步功能。 -
我有一个脚本抛出了同样的错误,并且正在检索已经成功运行了几个月的 JSON 数据。对我来说,问题出在服务器上。磁盘已满。一旦我清理了一些空间,它就又开始正常工作了。您可能需要联系服务器管理员进行检查。
标签: python python-3.x asynchronous salesforce aiohttp