【发布时间】:2022-01-23 01:59:10
【问题描述】:
我目前正在 Azure 数据工厂和 python 上构建数据管道。 python 脚本执行简单的网络抓取并将文件保存在本地,然后将该文件上传到 Azure 存储上的相应 blob 容器和 Azure SQL 数据库。
但是,我正在寻找的解决方案是跳过任何本地存储 - 换句话说,直接在对 python 脚本上的数据进行网络抓取之后 - 我想写入 blob 容器中的新 blob 文件而无需调用在任何本地存储上。
有谁知道任何好的教程/资源/这在 Azure 数据工厂中是否可行?谢谢!
from typing import Container
import requests
from bs4 import BeautifulSoup
import pandas as pd
import csv
from azure.storage.blob import BlobClient
import pandas as pd
import os
import yaml
from yaml.loader import Loader
from azure.storage.blob import ContainerClient
class RightmoveScraper:
results=[]
def fetch(self,url):
print('HTTP GET reuqest to URL')
response = requests.get(url)
print(' | Status code: %s' % response.status_code)
return response
def parse(self,html):
content = BeautifulSoup(html,'lxml')
## set selectors to scrape ##
## from ws.io
cards = [card for card in content.find_all('div',{'l-searchResult is-list'})]
titles =[title.text.strip() for title in content.findAll('h2', {'class':'propertyCard-title'})] #title.text
addresses =[address.text.strip() for address in content.findAll('address',{'propertyCard-address'})]
descriptions = [description.text for description in content.findAll('span',{'data-test':'property-description'})]
prices = [price.text.strip() for price in content.find_all('div',{'propertyCard-priceValue'})]
dates = [date.text.split()[-1] for date in content.findAll('span',{'propertyCard-branchSummary-addedOrReduced'})]
agents = [agent.text.split('by')[-1].strip() for agent in content.findAll('div',{'propertyCard-branchSummary'})]
agentnumbers = [agentnumber.text for agentnumber in content.findAll('a',{'propertyCard-contactsPhoneNumber'})]
isSale = 'Sale'
totalresults = [totalresult.text for totalresult in content.findAll('span',{'searchHeader-resultCount'})]
floorplans = []
photos=[]
virtualtours=[]
for card in cards:
try:
floorplans.append(card.find('span',{'no-svg-floorplan propertyCard-moreInfoIcon'}))
photos.append(card.find('span',{'propertyCard-moreInfoNumber'}).text)
virtualtours.append(card.find('span',{'no-svg-virtualtour propertyCard-moreInfoIcon'}))
except:
floorplans.append(None)
photos.append(None)
virtualtours(None)
for index in range(0,len(floorplans)):
if floorplans[index]==None:
floorplans[index]="No floorplan"
else:
floorplans[index]="Has floorplan"
if photos[index]==None:
photos[index]="No photos"
else:
photos[index]=photos[index]
if virtualtours[index]==None:
virtualtours[index]="No virtual tour"
else:
virtualtours[index]="Has virtual tour"
for index in range(0,len(titles)):
self.results.append({
'title': titles[index],
'address': addresses[index],
'description': descriptions[index],
'prices': prices[index],
'dates': dates[index],
'agents': agents[index],
'isSale': isSale,
'floorplan?': floorplans[index],
'photos': photos[index],
'virtualtours':virtualtours[index]
})
def no_blank(fd):
try:
while True:
line = next(fd)
if len(line.strip()) != 0:
yield line
except:
return
def to_csv(self):
with open('rightmove.csv','w',newline='') as csv_file:
writer = csv.DictWriter((csv_file),fieldnames=self.results[0].keys()) #keys method returns a view objection
writer.writeheader()
for row in self.results:
if any(field.strip for field in row):
writer.writerow(row)
print('Stored results to "rightmove.csv"')
def run(self):
# response = self.fetch('https://www.rightmove.co.uk/property-for-sale/Central-London.html')
html=''
with open('res.html','r') as html_file:
for line in html_file:
html += html_file.read()
# html_file.write(response.text) #writes request into html code
self.parse(html)
self.to_csv()
self.sendToAzure()
pass
######## uplaoding files to azure ###############
def load_config(self):
dir_root = os.path.dirname(os.path.abspath(__file__))
with open(dir_root + "/config.yaml","r") as yamlfile:
return yaml.load(yamlfile, Loader=yaml.FullLoader)
def get_files(self,dir):
with os.scandir(dir) as entries:
for entry in entries:
if entry.is_file() and not entry.name.startswith('.'):
yield entry
def upload(self,files, connection_string, container_name):
Container_client = ContainerClient.from_connection_string(connection_string,container_name)
print("Uploading files to blob storage...")
for file in files:
blob_client = Container_client.get_blob_client(file.name)
with open(file.path,"rb") as data:
print(data)
blob_client.upload_blob(data)
print(f'{file.name} uploaded to blob storage')
def sendToAzure(self):
config = self.load_config()
datasets = self.get_files(config["source_folder"])
self.upload(datasets,config["azure_storage_connectionstring"],config["data_containername"])
if __name__ == '__main__':
scraper = RightmoveScraper()
scraper.run()
【问题讨论】:
-
转换 Python 以在 Databricks 笔记本或 Synapse 笔记本中运行?
-
感谢@wBob 的建议!如果我将脚本转换为数据块,它是否仍然无法将文件保存到我的本地计算机上?
-
不。您可以直接保存到 blob 存储,也可以在运行时将其临时保存到与按需 Databricks 集群关联的存储中。我认为这对您来说是一个更好的选择,因为它不需要任何本地存储。
标签: python azure azure-blob-storage azure-data-factory