-
Notifications
You must be signed in to change notification settings - Fork 48
Expand file tree
/
Copy pathzonal-algebra.pymd
More file actions
127 lines (91 loc) · 5.92 KB
/
zonal-algebra.pymd
File metadata and controls
127 lines (91 loc) · 5.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Zonal Map Algebra
```python setup, echo=False
from IPython.display import display
import pyrasterframes
from pyrasterframes.utils import create_rf_spark_session
from pyrasterframes.rasterfunctions import *
import pyrasterframes.rf_ipython
from pyspark.sql.functions import udf, lit
from geomesa_pyspark.types import MultiPolygonUDT
# This seemed to work for time-series!
spark = create_rf_spark_session("local[4]")
```
## Definition
Zonal map algebra refers to operations over raster cells based on the definition of a _zone_. In concept, a _zone_ is like a mask: a raster with a special value designating membership of the cell in the zone. In general, we assume that _zones_ are defined by @ref[vector geometries](vector-data.md).
## Analysis Plan
We will compute average @ref:[NDVI](local-algebra.md#computing-ndvi) over the month of May 2018 for two US national parks: Cuyahoga Valley and Indiana Dunes. We will select data from the @ref:[built-in catalog](raster-catalogs.md#using-built-in-experimental-catalogs), join it with park geometries, read _tiles_ for the bands needed, burn-in or rasterize the geometries to _tiles_, and compute the aggregate.
## Get Vector Data
First we download vector from the US National Park Service open data portal, and take a look at the data.
```python get_park_boundary
import requests
import folium
nps_filepath = '/tmp/2parks.geojson'
nps_data_query_url = 'https://services1.arcgis.com/fBc8EJBxQRMcHlei/arcgis/rest/services/' \
'NPS_Park_Boundaries/FeatureServer/0/query' \
'?geometry=-87.601,40.923,-81.206,41.912&inSR=4326&outSR=4326' \
"&where=UNIT_TYPE='National Park'&outFields=*&f=geojson"
r = requests.get(nps_data_query_url)
with open(nps_filepath,'wb') as f:
f.write(r.content)
m = folium.Map()
layer = folium.GeoJson(nps_filepath)
m.fit_bounds(layer.get_bounds())
m.add_child(layer)
m
```
Now we read the park boundary vector data as a Spark DataFrame using the built-in @ref:[geojson DataSource](vector-data.md#geojson-datasource). The geometry is very detailed, and the EO cells are relatively coarse. To speed up the processing, the geometry we "simplify" by combining vertices within about 500 meters of each other. For more on this see the section on Shapely support in @ref:[user defined functions](vector-data.md#shapely-geometry-support).
```python read_park_vector
park_vector = spark.read.geojson(nps_filepath)
@udf(MultiPolygonUDT())
def simplify(g, tol):
return g.simplify(tol)
park_vector = park_vector.withColumn('geo_simp', simplify('geometry', lit(0.005))) \
.select('geo_simp', 'OBJECTID', 'UNIT_NAME') \
.hint('broadcast')
```
## Catalog Read
Both parks are entirely contained in MODIS granule h11 v04. We will simply filter on this granule, rather than using a @ref:[spatial relation](vector-data.md#geomesa-functions-and-spatial-relations).
```python query_catalog
cat = spark.read.format('aws-pds-modis-catalog').load().repartition(50)
park_cat = cat \
.filter(
(cat.granule_id == 'h11v04') &
(cat.acquisition_date >= lit('2018-05-01')) &
(cat.acquisition_date < lit('2018-06-01'))
) \
.crossJoin(park_vector)
park_cat.printSchema()
```
We will combine the park geometry with the catalog, and read only the bands of interest to compute NDVI, which we discussed in a @ref:[previous section](local-algebra.md#computing-ndvi).
Now we have a dataframe with several months of MODIS data for a single granule. However, the granule covers a great deal of area outside our park boundaries _zones_. To deal with this we will, first [reproject](https://gis.stackexchange.com/questions/247770/understanding-reprojection) the park geometry to the same @ref:[CRS](concepts.md#coordinate-reference-system--crs-) as the imagery. Then we will filter to only the _tiles_ intersecting the park _zones_.
```python read_catalog
raster_cols = ['B01', 'B02',] # red and near-infrared respectively
park_rf = spark.read.raster(
park_cat.select(['acquisition_date', 'granule_id'] + raster_cols + park_vector.columns),
catalog_col_names=raster_cols) \
.withColumn('park_native', st_reproject('geo_simp', lit('EPSG:4326'), rf_crs('B01'))) \
.filter(st_intersects('park_native', rf_geometry('B01')))
park_rf.printSchema()
```
## Define Zone Tiles
Now we have the vector representation of the park boundary alongside the _tiles_ of red and near infrared bands. Next, we need to create a _tile_ representation of the park to allow us to limit the raster analysis to pixels within the park _zone_. This is similar to the masking operation demonstrated in @ref:[Masking](masking.md#masking). We rasterize the geometries using @ref:[`rf_rasterize`](reference.md#rf-rasterize): this creates a new _tile_ column aligned with the imagery, and containing the park's OBJECTID attribute for cells intersecting the _zone_. Cells outside the park _zones_ have a NoData value.
```python burn_in
rf_park_tile = park_rf \
.withColumn('dims', rf_dimensions('B01')) \
.withColumn('park_zone_tile', rf_rasterize('park_native', rf_geometry('B01'), 'OBJECTID', 'dims.cols', 'dims.rows')) \
.persist()
rf_park_tile.printSchema()
```
## Compute Zonal Statistics
We compute NDVI as the normalized difference of near infrared (band 2) and red (band 1). The _tiles_ are masked by the `park_zone_tile`, limiting the cells to those in the _zone_. To finish, we compute our desired statistics over the NVDI _tiles_ that are limited by the _zone_.
```python ndvi_zonal, evaluate=True
from pyspark.sql.functions import col
from pyspark.sql import functions as F
rf_ndvi = rf_park_tile \
.withColumn('ndvi', rf_normalized_difference('B02', 'B01')) \
.withColumn('ndvi_masked', rf_mask('ndvi', 'park_zone_tile'))
zonal_mean = rf_ndvi \
.groupby('OBJECTID', 'UNIT_NAME') \
.agg(rf_agg_mean('ndvi_masked'))
zonal_mean
```