Point In Polygon: Processor#

Table of Contents#

  1. Data Generation

  2. Point In Polygon

[ ]:
import bdt
bdt.auth("bdt.lic")
from bdt import functions as F
from bdt import processors as P
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
BDT has been successfully authorized!

            Welcome to
             ___    _                ___         __             ______             __   __     _   __
            / _ )  (_)  ___ _       / _ \ ___ _ / /_ ___ _     /_  __/ ___  ___   / /  / /__  (_) / /_
           / _  | / /  / _ `/      / // // _ `// __// _ `/      / /   / _ \/ _ \ / /  /  '_/ / / / __/
          /____/ /_/   \_, /      /____/ \_,_/ \__/ \_,_/      /_/    \___/\___//_/  /_/\_\ /_/  \__/
                      /___/

BDT python version: v3.3.0-v3.3.0
BDT jar version: v3.3.0-v3.3.0

Part 1: Generate Sample Data#

pip_processor

  • Both the Polygon and Point Dataframes are created from scratch by specifying the data and schema

    • A unique identifier and the associated WKT value are added to each row

Create the Polygon Data#

[ ]:
polygon1 = """POLYGON ((-117.17 34.08,
                       -117.17 34.06,
                       -117.21 34.06,
                       -117.21 34.08))"""
polygon2 = """POLYGON ((-117.23 34.09,
                       -117.19 34.08,
                       -117.19 34.05,
                       -117.23 34.05))"""

polySchema = StructType([StructField("POLY_ID", IntegerType()),
                         StructField("POLY_WKT",StringType())])

polyData = [(1, polygon1),
            (2, polygon2)]

polyDF = spark.createDataFrame(data = polyData, schema = polySchema)
polyDF.show(truncate = True)
+-------+--------------------+
|POLY_ID|            POLY_WKT|
+-------+--------------------+
|      1|POLYGON ((-117.17...|
|      2|POLYGON ((-117.23...|
+-------+--------------------+

Create the Point Data#

[ ]:
point1 = "POINT (-117.21 34.08)"
point2 = "POINT (-117.22 34.06)"
point3 = "POINT (-117.20 34.07)"
point4 = "POINT (-117.18 34.07)"
point5 = "POINT (-117.18 34.07)"
point6 = "POINT (-117.20 34.09)"

pointSchema = StructType([StructField("POINT_ID", IntegerType()),
                          StructField("POINT_WKT",StringType())])

pointData = [(1, point1),
             (2, point2),
             (3, point3),
             (4, point4),
             (5, point5),
             (6, point6)]

pointDF = spark.createDataFrame(data = pointData, schema = pointSchema)
pointDF.show(truncate = True)
+--------+--------------------+
|POINT_ID|           POINT_WKT|
+--------+--------------------+
|       1|POINT (-117.21 34...|
|       2|POINT (-117.22 34...|
|       3|POINT (-117.20 34...|
|       4|POINT (-117.18 34...|
|       5|POINT (-117.18 34...|
|       6|POINT (-117.20 34...|
+--------+--------------------+

Part 2: Point in Polygon#

Create SHAPE structs in each DataFrame#

st_FromText converts a WKT representation of a geometry to BDT’s internal Shape Struct object. The default convention is to label this Shape Struct as ‘SHAPE’.

[ ]:
polyDF_S = polyDF\
           .select("*", F.st_fromText("POLY_WKT").alias("SHAPE"))\
           .withMeta("POLYGON", 4326)
polyDF_S.show()
+-------+--------------------+--------------------+
|POLY_ID|            POLY_WKT|               SHAPE|
+-------+--------------------+--------------------+
|      1|POLYGON ((-117.17...|{[01 06 00 00 00 ...|
|      2|POLYGON ((-117.23...|{[01 06 00 00 00 ...|
+-------+--------------------+--------------------+

[ ]:
pointDF_S = pointDF\
            .select("*", F.st_fromText("POINT_WKT").alias("SHAPE"))\
            .withMeta("POINT", 4326)
pointDF_S.show(3)
+--------+--------------------+--------------------+
|POINT_ID|           POINT_WKT|               SHAPE|
+--------+--------------------+--------------------+
|       1|POINT (-117.21 34...|{[01 01 00 00 00 ...|
|       2|POINT (-117.22 34...|{[01 01 00 00 00 ...|
|       3|POINT (-117.20 34...|{[01 01 00 00 00 ...|
+--------+--------------------+--------------------+
only showing top 3 rows

Processor PointInPolygon (PIP)#

Processor PIP is a highly optimized and easy to use method for running point in polygon jobs.

Set the cell size to 5 degrees

cellSize determines the QR value(s) assigned to a given geometry

[ ]:
cellSize = 5.0

Call the processor, passing in the two DataFrames as the first arguments (point DataFrame then polygon DataFrame) and then the other required and optional parameters. * setting take = 2 means up to 2 overlapping polygons will be returned for a given point * setting emitPippedOnly = true means only points that are contained within a polygon will be returned in the output

[ ]:
pipped = P.pip(pointDF_S,
         polyDF_S,
         cellSize=cellSize,
         take = 2,
         emitPippedOnly = True)

print(pipped.count())
pipped.show(3)
6
+--------+--------------------+--------------------+-------+--------------------+
|POINT_ID|           POINT_WKT|               SHAPE|POLY_ID|            POLY_WKT|
+--------+--------------------+--------------------+-------+--------------------+
|       1|POINT (-117.21 34...|{[01 01 00 00 00 ...|      2|POLYGON ((-117.23...|
|       4|POINT (-117.18 34...|{[01 01 00 00 00 ...|      1|POLYGON ((-117.17...|
|       2|POINT (-117.22 34...|{[01 01 00 00 00 ...|      2|POLYGON ((-117.23...|
+--------+--------------------+--------------------+-------+--------------------+
only showing top 3 rows