import altair as alt
import ee
from earthengine_dask.core import ClusterGEE
import google.auth
import pandas as pd
earthengine-dask
Scale up concurrent requests to Earth Engine interactive endpoints with Dask
Prerequisites
- A Google Earth Engine account.
- Access to a Google Cloud Platform (GCP) project with the Earth Engine API enabled.
- A Coiled account that is setup to use the GCP project.
Installation
TODO...
How to use
Import Python packages
Authenticate & Initialize Earth Engine
Get credentials and the GCP project ID, authenticating if necessary.
try:
= google.auth.default()
credentials, project_id except google.auth.exceptions.DefaultCredentialsError:
!gcloud auth application-default login
= google.auth.default()
credentials, project_id
=credentials, project=project_id) ee.Initialize(credentials
Start Dask Cluster
Start up a Earth Engine enabled cluster. This may take a few minutes to complete.
= ClusterGEE(
cluster ='test-class-cluster',
name=2,
n_workers=8,
worker_cpu='us-central1',
region )
Retrieve a client for the cluster, and display it.
= cluster.get_client()
client client
Submit Jobs
Test it out by: - Defining a function that can be distributed, - Submitting jobs running the function to workers, - Gathering the results locally, and - Displaying the results
# Get a list of countries to analyze.
= ee.FeatureCollection('USDOS/LSIB_SIMPLE/2017')
country_fc = country_fc.aggregate_array('country_na').distinct().sort().getInfo()
country_list
# Write a function that can be run by the cluster workers.
def get_country_stats(country_name):
= country_fc.filter(ee.Filter.eq('country_na', country_name))
country = ee.ImageCollection("COPERNICUS/DEM/GLO30").select('DEM').mosaic()
elev return {
'country': country_name,
'area_km2': country.geometry().area().multiply(1e-6).round().getInfo(),
'mean_elev': elev.reduceRegion(reducer=ee.Reducer.mean(),
=country.geometry(),
geometry=10000,
scale'DEM').getInfo(),
).get(
}
# Create and submit jobs to among the workers.
= [
submitted_jobs
client.submit(get_country_stats, country)for country in country_list
]
# Gather up the results and display them.
= client.gather(submitted_jobs)
results = pd.DataFrame(results)
df df
Shut down the cluster
cluster.shutdown()