Sources#
Reading data into BDT means using any of Spark supported data sources and ST functions to transform the geometry into a shape struct with metadata. BDT also includes custom data sources for reading File Geodatabase and Shapefile. This section demonstrates loading gdb, shp, csv, parquet, and jdbc.
File Geodatabase#
Read and create a DataFrame from the File Geodatabase using the format “com.esri.gdb”. The two required options are the path to the gdb file and the name of the Feature Class to be read into the dataframe.
BDT can be used to identify the name of the Feature Class you would like to load in. Simply leave off the name parameter and the DataFrame returned after .load() will have all the layer names in a column.
# Feature Class Names
df_names = (
spark
.read
.format("gdb")
.options(path="./src/test/resources/Redlands.gdb")
.load()
)
The Shape column must be created with one of the ST functions: ST_FromFGDBPoint, ST_FromFGDBPolyline, ST_FromFGDBPolygon. Use the withMeta function to set the metadata on the outgoing dataframe.
File Geodatabase in a compressed state is not supported yet. For now you will need to decompress the gdb before reading into your BDT job.
It is important to set the numPartitions option because by default the data is read into only 1 partition.
Note: FGDB feature classes already have a column named “Shape” that is different from the BDT Shape struct. If you want to select all the columns from the DataFrame, you will need to alias one of the Shape columns. This will avoid a column name collision when trying to drop the FGDB Shape column. To avoid the extra steps of aliasing and dropping the FGDB Shape column, select only the specific columns you need in the selectExpr.
# Point
df_point = (
spark
.read
.format("gdb")
.options(path="./src/test/resources/Redlands.gdb", name="Policy_25M", numPartitions=40)
.load()
.selectExpr("ST_FromFGDBPoint(Shape) as NEW_SHAPE", "*")
.drop("Shape")
.withColumnRenamed("NEW_SHAPE", "SHAPE")
.withMeta("Point", 4326)
)
# Polygon
df = (
spark
.read
.format("gdb")
.options(path="/mnt/data/BusinessAnalyst.gdb", name="DMA", numPartitions=16)
.load()
.selectExpr("ST_FromFGDBPolygon(Shape) as NEW_SHAPE", "*")
.drop("Shape")
.withColumnRenamed("NEW_SHAPE", "SHAPE")
.withMeta("POLYGON", 4326)
)
# Polyline
df_polyline = (
spark
.read
.format("gdb")
.options(path=/mnt/data/Redlands.gdb, name='MapHighways', numPartitions=16)
.load()
.selectExpr("OBJECTID Link_ID",f"ST_FromFGDBPolyline(shape) Shape")
.withMeta("POLYLINE", 3857)
)
Shapefile#
As with File Geodatabase it is important to set the numPartitions option for Shapefile sources because the default is only 1 partition.
df = (
spark
.read
.format("shp")
.options(path="/mnt/data/North_American_Roads/North_American_Roads.shp", format="WKB", numPartitions=16, repair="OGC",wkid="4326")
.load()
.selectExpr("ST_FromWKB(shape) as SHAPE_NEW", "*")
.drop("shape")
.withColumnRenamed("SHAPE_NEW", "SHAPE")
.withMeta("Polyline", 4326)
)
The following options are accepted:
path
The location of shapefile(s). Supports the use of wildcard characters in specifying file paths, e.g. /path/*.shp.shape
An optional name of the shape column. Default value isshape
.columns
An optional list of comma separated attribute column names. Default value is blank indicating all attribute fields.format
An optional parameter to define the output format of the shape field. Default value isSHP
. Possible values are:SHP
Esri binary shape format.WKT
Well known Text.GEOJSON
GeoJSON
repair
An optional parameter to repair the read geometry. Possible values are:None
No repair.Esri
Apply Esri repair operator.OGC
Apply OGC repair operator.
For more information see the spark-shp repository
csv#
Use the Spark API to read formats like csv, parquet, jdbc and any data source Spark supports.
If the point is stored in X and Y columns, then use ST_FromXY or ST_MakePoint in the sql statement
If the shape column is WKT, then use ST_FromText on the shape column in the sql statement.
If the shape column is WKB, then use ST_FromWKB on the shape column in the sql statement.
Set the metadata of the shape column.
df1 = (spark
.read
.option("mode", "failfast")
.option("delimiter", "\\t")
.option("header", "false")
.schema("MID LONG,LON DOUBLE,LAT DOUBLE,Quarter STRING,LineOfBusiness STRING,TotalInsuredValue DOUBLE,TotalReplacementValue DOUBLE,ConstructionType STRING,ISOType STRING,YearBuilt STRING,WKT STRING")
.csv(f"/mnt/data/policy_points.csv")
.selectExpr("ST_FromText(WKT) as SHAPE", "MID", "Quarter", "TotalReplacementValue")
.where("Quarter = '2'")
.withMeta("Point", 4326, shapeField = "SHAPE")
.repartition(8)
).cache()
parquet#
Parquet has a lot of performance advantages and can store complex types like arrays and structs so it is well suited for persisting BDT shape struct with metadata.
If the shape column is WKT, then use ST_FromText on the shape column in the sql statement.
If the shape column is WKB, then use ST_FromWKB on the shape column in the sql statement.
If the shape column is shape struct, then no ST function is needed.
Parquet can also persist column metadata so if you write data with a BDT shape struct you can read it in with a simple:
df = spark\
.read\
.parquet("/mnt/data/Counties.parquet")
When the parquet has geometry in other formats the ST functions can transform the source data:
df = spark\
.read\
.parquet("/mnt/data/delivery_zones.parquet")\
.selectExpr("ST_FromWKB(geometry) Shape", "m_id")\
.withMeta("POLYGON", 4326)
jdbc#
Read from relational databases using Spark’s jdbc format. Depending on the storage type for geometry in the database you can use spark jdbc’s query option and invoke database spatial type functions to extract WKB and then use BDT ST functions to create the Shape struct.
# Geometry stored in postgres
df_jdbc = spark.read\
.format("jdbc")\
.option("url", f'mypostgres')\
.option("query", "SELECT ST_AsText(Shape) actual_wkt, wkt as expected_wkt, id from point")\
.option("user", 'myuser')\
.option("password", '********')\
.load()\
.selectExpr("ST_FromText(actual_wkt) Shape", "id")
.withMeta("POINT", 4326)
# Geometry stored in SQL Server
df_jdbc = spark.read\
.format("jdbc")\
.option("url", f'mysqlserver')\
.option("query", "SELECT *, Shape.STAsBinary() WKB FROM dbo.Buffer")\
.option("user", 'myuser')\
.option("password", '********')\
.load()\
.selectExpr("M_ID", "ST_FromWKB(WKB) Shape")\
.withMeta("POINT", 4326)