core

Core functionality for distributing Earth Engine requests among Dask workers.

source

ClusterGEE

 ClusterGEE (**kwargs)

Create a Dask cluster with Coiled


source

InitEarthEngine

 InitEarthEngine (**kwargs)

*Interface to extend the Worker

A worker plugin enables custom code to run at different stages of the Workers’ lifecycle.

A plugin enables custom code to run at each of step of a Workers’s life. Whenever such an event happens, the corresponding method on this class will be called. Note that the user code always runs within the Worker’s main thread.

To implement a plugin:

  1. inherit from this class
  2. override some of its methods
  3. register the plugin using :meth:Client.register_plugin<distributed.Client.register_plugin>.

The idempotent attribute is used to control whether or not the plugin should be ignored upon registration if a worker plugin with the same name already exists. If True, the plugin is ignored, otherwise the existing plugin is replaced. Defaults to False.*

Try it out

Create a cluster and run a few jobs.

Authenticate & Initialize Earth Engine

Get credentials and the GCP project ID, authenticating if necessary.

try:
    credentials, project_id = google.auth.default()
except google.auth.exceptions.DefaultCredentialsError:
    !gcloud auth application-default login
    credentials, project_id = google.auth.default()

ee.Initialize(credentials=credentials, project=project_id)

Start Dask Cluster

Start up a Earth Engine enabled cluster. This may take a few minutes to complete.

cluster = ClusterGEE(
    name='test-class-cluster',
    n_workers=2,
    worker_cpu=8,
    region='us-central1',
)

Google Application Default Credentials have been written to a file on your Coiled VM(s).
These credentials will potentially be valid until explicitly revoked by running
gcloud auth application-default revoke

Retrieve a client for the cluster, and display it.

client = cluster.get_client()
client

Client

Client-b3af9aca-1a13-11ef-9cc9-fe11494405b6

Connection method: Cluster object Cluster type: __main__.ClusterGEE
Dashboard: https://cluster-ufyqp.dask.host/FqddnuVpTbRxtrlw/status

Cluster Info

Worker: test-class-cluster-worker-92d9293f4a

Comm: tls://10.2.0.11:40289 Total threads: 8
Dashboard: http://10.2.0.11:8787/status Memory: 30.58 GiB
Nanny: tls://10.2.0.11:34949
Local directory: /scratch/dask-scratch-space/worker-o47xuth5

Submit Jobs

Test it out by: - Defining a function that can be distributed, - Submitting jobs running the function to workers, - Gathering the results locally, and - Displaying the results

# Get a list of countries to analyze.
country_fc = ee.FeatureCollection('USDOS/LSIB_SIMPLE/2017')
country_list = country_fc.aggregate_array('country_na').distinct().sort().getInfo()

# Write a function that can be run by the cluster workers. 
def get_country_stats(country_name):
    country = country_fc.filter(ee.Filter.eq('country_na', country_name))
    elev = ee.ImageCollection("COPERNICUS/DEM/GLO30").select('DEM').mosaic()
    return {
        'country': country_name, 
        'area_km2': country.geometry().area().multiply(1e-6).round().getInfo(), 
        'mean_elev': elev.reduceRegion(reducer=ee.Reducer.mean(),
                                       geometry=country.geometry(),
                                       scale=10000,
                                       ).get('DEM').getInfo(),
    }

# Create and submit jobs to among the workers.
submitted_jobs = [
    client.submit(get_country_stats, country)
    for country in ['Abyei Area', 'Zimbabwe']
]

# Gather up the results and display them.
results = client.gather(submitted_jobs)
results
[{'country': 'Abyei Area', 'area_km2': 10460, 'mean_elev': 402.5921903247955},
 {'country': 'Zimbabwe', 'area_km2': 391916, 'mean_elev': 973.2955548809969}]

Display a scatter plot of the data.

Shut down the cluster.

cluster.shutdown()