はじめに
Beginning Apache Spark using Azure Databricksやっていきます
開発環境
Chapter 5: Getting Data into Databricks
1 |
%fs ls / |
1 2 3 4 5 6 7 |
path,name,size,modificationTime dbfs:/FileStore/,FileStore/,0,1649813340000 dbfs:/databricks-datasets/,databricks-datasets/,0,0 dbfs:/databricks-results/,databricks-results/,0,0 dbfs:/local_disk0/,local_disk0/,0,1650294537000 dbfs:/tmp/,tmp/,0,1650699514000 dbfs:/user/,user/,0,1650294511000 |
1 |
%fs ls /databricks-datasets/ |
1 2 3 4 5 6 7 8 |
path,name,size,modificationTime dbfs:/databricks-datasets/COVID/,COVID/,0,1650702268109 dbfs:/databricks-datasets/README.md,README.md,976,1532468253000 dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,1650702268109 dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000 dbfs:/databricks-datasets/adult/,adult/,0,1650702268109 dbfs:/databricks-datasets/airlines/,airlines/,0,1650702268109 ... |
1 |
%fs ls /databricks-datasets/airlines/ |
1 2 3 4 5 6 |
path,name,size,modificationTime dbfs:/databricks-datasets/airlines/README.md,README.md,1089,1454697889000 dbfs:/databricks-datasets/airlines/_SUCCESS,_SUCCESS,0,1436493184000 dbfs:/databricks-datasets/airlines/part-00000,part-00000,67108879,1436493184000 dbfs:/databricks-datasets/airlines/part-00001,part-00001,67108862,1436493185000 ... |
1 |
%fs head /databricks-datasets/airlines/README.md |
1 2 3 4 5 6 7 |
================================================ Airline On-Time Statistics and Delay Causes ================================================ ## Background The U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics (BTS) tracks the on-time performance of domestic flights operated by large air carriers. ... |
1 |
%fs cp /databricks-datasets/airlines/README.md / |
1 |
res2: Boolean = true |
1 |
%fs rm /README.md |
1 |
res3: Boolean = true |
1 |
dbutils.fs.ls("/databricks-datasets") |
1 2 3 4 5 6 7 |
Out[1]: [FileInfo(path='dbfs:/databricks-datasets/COVID/', name='COVID/', size=0, modificationTime=1650702243729), FileInfo(path='dbfs:/databricks-datasets/README.md', name='README.md', size=976, modificationTime=1532468253000), FileInfo(path='dbfs:/databricks-datasets/Rdatasets/', name='Rdatasets/', size=0, modificationTime=1650702243729), FileInfo(path='dbfs:/databricks-datasets/SPARK_README.md', name='SPARK_README.md', size=3359, modificationTime=1455043490000), FileInfo(path='dbfs:/databricks-datasets/adult/', name='adult/', size=0, modificationTime=1650702243729), FileInfo(path='dbfs:/databricks-datasets/airlines/', name='airlines/', size=0, modificationTime=1650702243729), ... |
1 |
dbutils.fs.head("/databricks-datasets/airlines/README.md") |
1 |
Out[3]: "================================================\nAirline On-Time Statistics and Delay Causes\n================================================\n\n## Background\nThe U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics (BTS) tracks the on-time performance of domestic flights operated by large air carriers. |
1 2 3 |
files = dbutils.fs.ls("/") for f in files: print(f.name) |
1 2 3 4 5 6 |
FileStore/ databricks-datasets/ databricks-results/ local_disk0/ tmp/ user/ |
1 |
x = [print(f.name) for f in dbutils.fs.ls("/")] |
1 2 3 4 5 6 |
FileStore/ databricks-datasets/ databricks-results/ local_disk0/ tmp/ user/ |
1 |
displayHTML("<img src="/files/images/logo.png" />") |
1 2 3 |
%sh cd /tmp wget http://ergast.com/downloads/f1db_csv.zip |
1 |
%sh ls /tmp |
1 2 3 |
... f1db_csv.zip ... |
1 2 |
%sh unzip -Z1 /tmp/f1db_csv.zip |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
circuits.csv constructor_results.csv constructors.csv constructor_standings.csv drivers.csv driver_standings.csv lap_times.csv pit_stops.csv qualifying.csv races.csv results.csv seasons.csv sprint_results.csv status.csv |
1 2 |
%sh unzip -j /tmp/f1db_csv.zip constructors.csv -d /tmp |
1 2 |
Archive: /tmp/f1db_csv.zip inflating: /tmp/constructors.csv |
1 |
%sh ls /tmp/*.csv |
1 |
/tmp/constructors.csv |
1 2 |
%sh mv /tmp/constructors.csv /dbfs/tmp |
1 2 3 4 5 |
from requests import get with open('/tmp/f1.zip', "wb") as file: response = get('http://ergast.com/downloads/f1db_csv.zip') file.write(response.content) |
1 2 3 4 5 |
from zipfile import ZipFile with ZipFile('/tmp/f1.zip', 'r') as zip: files = zip.namelist() for file in files: print(file) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
circuits.csv constructor_results.csv constructors.csv constructor_standings.csv drivers.csv driver_standings.csv lap_times.csv pit_stops.csv qualifying.csv races.csv results.csv seasons.csv sprint_results.csv status.csv |
1 2 3 |
from zipfile import ZipFile with ZipFile('/tmp/f1.zip', 'r') as zip: zip.extract('seasons.csv','/tmp') |
1 2 |
import os os.listdir("/tmp") |
1 2 |
Out[22]: [ 'f1db_csv.zip', 'seasons.csv',] |
1 |
dbutils.fs.mv("file:/tmp/seasons.csv", "dbfs:/tmp/seasons.csv") |
1 |
Out[24]: True |
1 2 3 4 5 6 7 8 9 |
df = spark \ .read \ .format("csv") \ .option("inferSchema","true") \ .option("header","false") \ .load("dbfs:/tmp/seasons.csv") \ .selectExpr("_c0 as year", "_c1 as url") df.write.saveAsTable('seasons') |
1 2 3 4 |
%sql create temporary table test (year INT, url STRING) using csv options (path "dbfs:/tmp/seasons.csv", header "false", mode "FAILFAST"); select * from test; |
1 2 3 4 |
FileReadException: Error while reading file dbfs:/tmp/seasons.csv. Caused by: SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. Caused by: BadRecordException: java.lang.NumberFormatException: For input string: "year" Caused by: NumberFormatException: For input string: "year" |
1 2 |
%sql show tables; |
1 2 3 |
database,tableName,isTemporary default,seasons,false ,test,true |
1 2 |
%sql drop table test; |
OK
1 2 3 4 |
%sql create temporary table test (year INT, url STRING) using csv options (path "dbfs:/tmp/seasons.csv", header "false", mode "PERMISSIVE"); select * from test; |
1 2 3 4 5 6 |
year,url null,url 2009,http://en.wikipedia.org/wiki/2009_Formula_One_season 2008,http://en.wikipedia.org/wiki/2008_Formula_One_season ... |
S3のマウント
Blobのマウント
Chapter 6: Querying Data Using SQL
Chapter 7: The Power of Python
Chapter 8: ETL and Advanced Data Wrangling
Chapter 9: Connecting to and from Databricks
Chapter 10: Running in Production
Chapter 11: Bits and Pieces
Azure Databricksの導入ならナレコムにおまかせください。
導入から活用方法までサポートします。お気軽にご相談ください。