【问题标题】:Is there a way to create a blob and upload the data to the blob container in Azure data factory?有没有办法创建 blob 并将数据上传到 Azure 数据工厂中的 blob 容器?
【发布时间】:2022-01-23 01:59:10
【问题描述】:

我目前正在 Azure 数据工厂和 python 上构建数据管道。 python 脚本执行简单的网络抓取并将文件保存在本地,然后将该文件上传到 Azure 存储上的相应 blob 容器和 Azure SQL 数据库。

但是,我正在寻找的解决方案是跳过任何本地存储 - 换句话说,直接在对 python 脚本上的数据进行网络抓取之后 - 我想写入 blob 容器中的新 blob 文件而无需调用在任何本地存储上。

有谁知道任何好的教程/资源/这在 Azure 数据工厂中是否可行?谢谢!

from typing import Container
import requests
from bs4 import BeautifulSoup
import pandas as pd
import csv
from azure.storage.blob import BlobClient
import pandas as pd
import os
import yaml
from yaml.loader import Loader
from azure.storage.blob import ContainerClient 

class RightmoveScraper:
    results=[]

    def fetch(self,url):

        print('HTTP GET reuqest to URL')
        response = requests.get(url)
        print(' | Status code: %s' % response.status_code)
        
        
        return response  

    def parse(self,html):
        content = BeautifulSoup(html,'lxml')
        
        ## set selectors to scrape ##
        ## from ws.io


        cards = [card for card in content.find_all('div',{'l-searchResult is-list'})]
        titles =[title.text.strip() for title in content.findAll('h2', {'class':'propertyCard-title'})] #title.text
        addresses =[address.text.strip() for address in content.findAll('address',{'propertyCard-address'})]
        descriptions = [description.text for description in content.findAll('span',{'data-test':'property-description'})]
        prices = [price.text.strip() for price in content.find_all('div',{'propertyCard-priceValue'})]
        dates = [date.text.split()[-1] for date in content.findAll('span',{'propertyCard-branchSummary-addedOrReduced'})]
        agents = [agent.text.split('by')[-1].strip() for agent in content.findAll('div',{'propertyCard-branchSummary'})]
        agentnumbers = [agentnumber.text for agentnumber in content.findAll('a',{'propertyCard-contactsPhoneNumber'})]
        isSale = 'Sale'
        totalresults = [totalresult.text for totalresult in content.findAll('span',{'searchHeader-resultCount'})]
        floorplans = []
        photos=[]
        virtualtours=[]

        for card in cards:
            try: 
               floorplans.append(card.find('span',{'no-svg-floorplan propertyCard-moreInfoIcon'}))
               photos.append(card.find('span',{'propertyCard-moreInfoNumber'}).text)
               virtualtours.append(card.find('span',{'no-svg-virtualtour propertyCard-moreInfoIcon'}))
            
            except:
                floorplans.append(None)
                photos.append(None)
                virtualtours(None)
          
        for index in range(0,len(floorplans)):
            if floorplans[index]==None:
                floorplans[index]="No floorplan"
            else:
                floorplans[index]="Has floorplan"

            if photos[index]==None:
                photos[index]="No photos"
            else:
                photos[index]=photos[index]

            if virtualtours[index]==None:
                virtualtours[index]="No virtual tour"
            else:
                virtualtours[index]="Has virtual tour"
            

        for index in range(0,len(titles)):
            self.results.append({
                'title': titles[index],
                'address': addresses[index],
                'description': descriptions[index],
                'prices': prices[index],
                'dates': dates[index],
                'agents': agents[index],
                'isSale': isSale,
                'floorplan?': floorplans[index],
                'photos': photos[index],
                'virtualtours':virtualtours[index]
            })

    def no_blank(fd):
        try:
            while True:
                line = next(fd)
                if len(line.strip()) != 0:
                    yield line
        except:
            return

    def to_csv(self):
        with open('rightmove.csv','w',newline='') as csv_file:
            writer = csv.DictWriter((csv_file),fieldnames=self.results[0].keys()) #keys method returns a view objection
            writer.writeheader()

            for row in self.results:
                if any(field.strip for field in row):
                    writer.writerow(row)

            print('Stored results to "rightmove.csv"')

    def run(self):
        # response = self.fetch('https://www.rightmove.co.uk/property-for-sale/Central-London.html')
        html=''
        with open('res.html','r') as html_file:
            for line in html_file:
                html += html_file.read()

            # html_file.write(response.text) #writes request into html code

        self.parse(html)
        self.to_csv()
        self.sendToAzure()

        pass

######## uplaoding files to azure ###############

    def load_config(self):
        dir_root = os.path.dirname(os.path.abspath(__file__))
        with open(dir_root + "/config.yaml","r") as yamlfile:
            return yaml.load(yamlfile, Loader=yaml.FullLoader) 
    
    def get_files(self,dir):
        with os.scandir(dir) as entries:       
            for entry in entries:
                if entry.is_file() and not entry.name.startswith('.'):
                    yield entry

    def upload(self,files, connection_string, container_name):
        Container_client = ContainerClient.from_connection_string(connection_string,container_name)
        print("Uploading files to blob storage...")


        for file in files:
            blob_client = Container_client.get_blob_client(file.name)
            with open(file.path,"rb") as data:
                print(data)
                blob_client.upload_blob(data)
                print(f'{file.name} uploaded to blob storage')

    def sendToAzure(self):
        config = self.load_config()
        datasets = self.get_files(config["source_folder"])
        self.upload(datasets,config["azure_storage_connectionstring"],config["data_containername"])
     
if __name__ == '__main__':
    scraper = RightmoveScraper()
    scraper.run()
    

【问题讨论】:

  • 转换 Python 以在 Databricks 笔记本或 Synapse 笔记本中运行?
  • 感谢@wBob 的建议!如果我将脚本转换为数据块,它是否仍然无法将文件保存到我的本地计算机上?
  • 不。您可以直接保存到 blob 存储,也可以在运行时将其临时保存到与按需 Databricks 集群关联的存储中。我认为这对您来说是一个更好的选择,因为它不需要任何本地存储。

标签: python azure azure-blob-storage azure-data-factory


【解决方案1】:

正如您提到的,您不想在本地创建文件并在上传文件时传递数据

我已将虚拟数据作为“abc”,您可以将抓取的数据传递给
blob_client.upload_blob(data, blob_type="BlockBlob")此函数

试试这个代码

from azure.storage.blob import BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string("Connection String")
# Initialise container
blob_container_client = blob_service_client.get_container_client("test")
# Get blob
dest_file_name = 'test.csv'
print("Creating the file ")
data="abc"
blob_client = blob_service_client.get_blob_client("test", dest_file_name)
blob_client.upload_blob(data, blob_type="BlockBlob")

输出

它创建 blob 并写入数据

【讨论】:

  • 您尝试将其上传到哪种存储帐户?
  • 感谢@ShrutiJoshi-MT 的建议,我已经尝试过实现这个,但是它抛出了一个异常:ValueError too many values to unpack (expected 2)
  • 我将其上传到 Blob 容器中的基本 Azure 存储帐户
  • 报废后你以什么格式获取数据?
猜你喜欢
  • 2021-03-19
  • 1970-01-01
  • 1970-01-01
  • 2016-10-02
  • 2020-07-20
  • 2021-07-03
  • 2020-12-22
  • 2022-06-26
  • 2021-05-06
相关资源
最近更新 更多