Deploying your own operational NRT system

Introduction

Monitoring algorithms implemented in the nrt python package are suitable for operational deployment. Setting up such a monitoring system requires at least some level of understanding of how the algorithms work as well as some IT skills; it would typically require the collaboration of a domain expert and an IT specialist. This page summarizes the main steps of the deployment process and provides useful snippets to facilitate it.

It assumes you have access to a constantly updated archive of Sentinel-2 L2A data indexed in a STAC catalogue and queryable via a STAC API.

The steps, which may differ depending on the specific objectives of your team, are:

  • Define the study area, CRS, and tiling strategy.

  • Obtain a forest mask.

  • Define the monitoring strategy and optimal parameters.

  • Fit the model(s) on a stable history period.

  • Fast forward (optional): Bring the monitoring state from the end of the historical period to the present day.

  • Set up periodic, automatic monitoring.

  • Publish the output.

  • Develop additional downstream services.

Study area, CRS, and tiling strategy

The size of the area you intend to monitor and your output format requirements will impact your choice of a Coordinate Reference System (CRS) and tiling strategy.

While a small area (e.g., less than 50x50 km) can be processed in a single chunk, it’s highly recommended to split larger areas into multiple tiles. This allows for parallel processing and easier management. You may choose to handle the entire process in the native CRS of the Sentinel data you’re using. However, if your study area crosses several UTM zones, it’s often easier to define a common CRS for all processing and warp all input data to that single grid.

Obtaining a forest mask

It’s highly recommended to use a forest mask to restrict monitoring to forested pixels. This avoids spurious anomaly detections in areas like agricultural fields or water bodies, which would add noise to the final product.

If a reliable tree cover layer is available for your area and matches your data’s resolution, you can use that. Alternatively, you can quickly generate a forest mask using the xinfereo Python package.

Here is an example of producing a forest mask using xinfereo:

import datetime

import xarray as xr
import numpy as np
import rasterio
from rasterio.crs import CRS
from affine import Affine
from pystac_client import Client
from odc.geo.geobox import GeoBox
from odc.stac import stac_load

from xinfereo import EOInferencer, data

# 1. Define the Study area via a GeoBox
gbox = GeoBox(
    (1000, 1000),
    Affine(20.0, 0.0, 3810180, 0.0, -20.0, 2517580),
    CRS.from_epsg(3035)
)

# 2. Query and load one year of Sentinel-2 data
date_range = [datetime.datetime(2024, 1, 1), datetime.datetime(2024, 12, 31)]
catalog = Client.open('https://your-stac-api.com/api')
query = catalog.search(
    collections=["EO.Copernicus.S2.L2A"], # Or your collection name
    bbox=gbox.geographic_extent.boundingbox,
    datetime=date_range,
    query={"eo:cloud_cover": {"lt": 50}}
)

# 3. Load the data into an xarray.Dataset
ds = stac_load(
    query.items(),
    bands=['B02_20', 'B03_20', 'B04_20', 'B05', 'B06',
           'B07', 'B8A', 'B11', 'B12'],
    resampling='cubic',
    geobox=gbox,
    groupby='solar_day',
    chunks={'time': 1}
)

# 4. Apply standard offset and scaling
ds = ds.where(ds != 0)
ds = (ds.astype(np.float32) - 1000) / 10000

# 5. Initialize the EOInferencer with a pre-trained model
infrs = EOInferencer(data.tcd_model_meta(), 'tcd_onnx_v0.2')

# 6. Run inference to predict tree cover density (returns a numpy array)
tcd_predictions = infrs.predict(ds)

# 7. Apply a threshold to create a binary forest/non-forest mask
forest_mask = (tcd_predictions[0] > 20).astype(np.uint8)

# 8. Save mask to disk for later use using metadata from the input xarray
transform = ds.rio.transform()
crs = ds.rio.crs
with rasterio.open(
    "forest_mask.tif",
    'w',
    driver='GTiff',
    height=forest_mask.shape[0],
    width=forest_mask.shape[1],
    count=1,
    dtype=forest_mask.dtype,
    crs=crs,
    transform=transform
) as dst:
    dst.write(forest_mask, 1)

Define monitoring strategy and optimal parameters

This step requires experimentation to find the best configuration for your specific use case. A non-exhaustive list of parameters to choose from is:

  • Vegetation Index (e.g., NDVI, NBR, NDMI)

  • Monitoring algorithm (CCDC, EWMA, CuSum, MoSum, and IQR are currently implemented in nrt)

  • Fitting parameters (e.g., fitting algorithm, outlier removal strategy)

  • Monitoring parameters (e.g., warming-up period, alert thresholds)

There are two main approaches to select the optimal parameters:

  1. Using reference data: This is the most objective method. If you have reference data from field surveys or visual interpretation, you can use them to quantitatively assess the performance of different parameter combinations. The nrt-validate package provides tools like the SegmentInterpreter to facilitate this process. See nrt-validate documentation.

  2. Simulation on a test area: A more qualitative approach is to run simulations on a well-known test area of manageable size. While less objective, this will give you a good intuitive feel for how well the results match the known disturbance patterns of the area.

Fit the stable history period for the chosen model(s)

Once you’ve chosen a model and its parameters, the first step is to fit it on a “stable” historical period. This period should contain representative, cloud-free observations of the “normal” forest phenology, without major disturbances.

The following snippet shows how to fit an IQR monitor with an NDVI time series from 2022 to 2024.

import datetime

import xarray as xr
import numpy as np
from rasterio.crs import CRS
from affine import Affine
from pystac_client import Client
from odc.geo.geobox import GeoBox
from odc.stac import stac_load
from nrt.monitor.iqr import IQR

# 1. Define the same study area and historical period
gbox = GeoBox(
    (1000, 1000),
    Affine(20.0, 0.0, 3810180, 0.0, -20.0, 2517580),
    CRS.from_epsg(3035)
)
history_period = [datetime.datetime(2022, 1, 1), datetime.datetime(2024, 12, 31)]

# 2. Query STAC for the historical data, including the SCL band for masking
catalog = Client.open('https://your-stac-api.com/api')
query = catalog.search(
    collections=["EO.Copernicus.S2.L2A"],
    bbox=gbox.geographic_extent.boundingbox,
    datetime=history_period,
    query={"eo:cloud_cover": {"lt": 80}} # Higher cloud cover is ok, we will mask it
)

# 3. Load data
ds = stac_load(
    query.items(),
    bands=['B04_20', 'B8A', 'SCL'],
    resampling={'B04_20': 'cubic',
                'B8A': 'cubic',
                'SCL': 'nearest'},
    geobox=gbox,
    groupby='solar_day',
    chunk={'time': 1}
)

# 4. Pre-process the data
ds = ds.where(ds != 0)
ds['B04_20'] = ds.B04_20 - 1000
ds['B8A'] = ds.B8A - 1000
# Mask clouds and non-vegetation pixels
ds = ds.where(ds.SCL.isin([4, 5, 6, 7]))
ds = ds.drop_vars('SCL')

# 5. Compute NDVI
ndvi = (ds.B8A - ds.B04_20) / (ds.B8A + ds.B04_20)
ndvi = ndvi.where(~np.isinf(ndvi), np.nan) # The above division may generate inf values which unlike nan are not supported by nrt's fit function
ndvi = ndvi.compute()

# 6. Initialize IQR monitor
with rasterio.open('forest_mask.tif') as src:
    mask = src.read(1)
model = IQR(mask=mask, trend=False, harmonic_order=2, sensitivity=2.5)

# 7. Fit the model on the historical NDVI data
model.fit(ndvi, method='RIRLS', maxiter=5)

# 8. Save the fitted state to disk for the next steps
model.to_netcdf("./data/iqr_ndvi_fitted_state.nc")

print("Fitting complete. State saved to disk.")

Fast forward

If the end of your historical period does not match today’s date, you may want to “fast forward” the monitoring state. This involves iteratively applying the monitor method for all observations between the end of the fit period and the present. This is also a good strategy for algorithms that require a “warm-up” period to perform optimally.

The following snippet loads the state from the previous step and updates it with data up to July 15, 2025.

import os
import datetime

import numpy as np
from pystac_client import Client
from odc.stac import stac_load
from nrt.monitor.iqr import IQR
# Assume gbox is defined or loaded from a configuration file

# 1. Configuration
STATE_FILE_INPUT = "./data/iqr_ndvi_fitted_state.nc"
STATE_FILE_OUTPUT = "./data/iqr_ndvi_current_state.nc"
REPORT_FILE = "./data/nrt_report.tif"
DATE_LOG_FILE = "./data/processed_dates.log"
FF_PERIOD = [datetime.datetime(2025, 1, 1), datetime.datetime(2025, 7, 16)]

# Note: Before running, you may want to manually remove the old DATE_LOG_FILE
# to ensure the fast-forward process starts with a clean slate.

# 2. Load the previously fitted monitor state
model = IQR.from_netcdf(STATE_FILE_INPUT)
print("Loaded fitted state from:", STATE_FILE_INPUT)

# 3. Query STAC for the fast-forward period
catalog = Client.open('https://your-stac-api.com/api')
query = catalog.search(
    collections=["EO.Copernicus.S2.L2A"],
    bbox=gbox.geographic_extent.boundingbox,
    datetime=FF_PERIOD
)
items = list(query.items())

if items:
    print(f"Found {len(items)} items for the fast-forward period.")

    # 4. Load data and calculate NDVI
    ds = stac_load(items,
                   bands=['B04_20', 'B8A', 'SCL'],
                   resampling={'B04_20': 'cubic',
                               'B8A': 'cubic',
                               'SCL': 'nearest'},
                   geobox=gbox,
                   groupby='solar_day',
                   chunks={'time': 1})
    ds = ds.where(ds != 0)
    ds['B04_20'] = ds.B04_20 - 1000
    ds['B8A'] = ds.B8A - 1000
    ds = ds.where(ds.SCL.isin([4, 5, 6, 7]))
    ds = ds.drop_vars('SCL')

    # 5. Compute NDVI
    ndvi = (ds.B8A - ds.B04_20) / (ds.B8A + ds.B04_20)
    ndvi = ndvi.where(~np.isinf(ndvi), np.nan)
    ndvi = ndvi.sortby('time')
    ndvi = ndvi.compute()

    # 5. Iteratively monitor each new observation and log the date
    for date in ndvi.time.values.astype('M8[s]').astype(datetime.datetime):
        ndvi_slice = ndvi.sel(time=date, method='nearest').values
        if not np.isnan(ndvi_slice).all():
            model.monitor(array=ndvi_slice, date=date)

            # For logging, convert the date to a simple string
            date_str = date.strftime('%Y-%m-%d')
            with open(DATE_LOG_FILE, 'a') as log:
                log.write(date_str + "\n")
            print(f"Processed and logged date: {date_str}")

    # 6. Generate the final report
    model.report(
        filename=REPORT_FILE,
        layers=['mask', 'detection_date'],
        dtype=np.uint8
    )
    print("Report generated at:", REPORT_FILE)

    # 7. Save the final, updated state to disk
    model.to_netcdf(STATE_FILE_OUTPUT)
    print("Fast forward complete. Final state saved to:", STATE_FILE_OUTPUT)

else:
    print("No new data found for the fast forward period.")

Automatic monitoring

After fitting and fast-forwarding, the system is ready for operational, automatic execution. This involves running a script periodically (e.g., daily) to check for new satellite imagery and update the monitoring state.

Monitoring Snippet

This generic script loads the current state, checks for new data since the last run, processes it, and saves the updated state and reports.

# operational_monitoring.py
import datetime

import os
import numpy as np
from pystac_client import Client
from odc.stac import stac_load
from nrt.monitor.iqr import IQR
# Assume gbox is defined or loaded from a config

def get_last_date_from_log(log_file, initial_date):
    """Reads the last date from the log file, returns initial_date if not found."""
    if not os.path.exists(log_file):
        return initial_date
    with open(log_file, 'r') as f:
        lines = f.readlines()
        if not lines:
            return initial_date
        return datetime.datetime.fromisoformat(lines[-1].strip())

def run_daily_monitoring():
    STATE_FILE = "./data/iqr_ndvi_current_state.nc"
    REPORT_FILE = "./data/nrt_report.tif"
    DATE_LOG_FILE = "./data/processed_dates.log"

    # The initial date should be the last day of your fitting/fast-forward period.
    # This is only used if the log file doesn't exist.
    initial_start_date = datetime.datetime(2025, 7, 15)

    # 1. Load the current monitoring state
    model = IQR.from_netcdf(STATE_FILE)

    # 2. Determine the date of the last observation from the log file
    last_date = get_last_date_from_log(DATE_LOG_FILE, initial_start_date)
    start_date = last_date + datetime.timedelta(days=1)
    end_date = datetime.datetime.now()

    print(f"Checking for new data from {start_date.date()} to {end_date.date()}")

    # 3. Query STAC for new data
    catalog = Client.open('https://your-stac-api.com/api')
    query = catalog.search(
        collections=["EO.Copernicus.S2.L2A"],
        bbox=gbox.geographic_extent.boundingbox,
        datetime=[start_date, end_date]
    )
    items = list(query.items())

    # 4. If new data is found, process it
    if not items:
        print("No new items since last update.")
        return

    print(f"Found {len(items)} new items. Processing...")
    # Same preprocessing as before
    ds = stac_load(items, bands=['B04_20', 'B8A', 'SCL'],
                   geobox=gbox,
                   groupby='solar_day',
                   chunks={'time': 1})
    ds = ds.where(ds != 0)
    ds['B04_20'] = ds.B04_20 - 1000
    ds['B8A'] = ds.B8A - 1000
    ds = ds.where(ds.SCL.isin([4, 5, 6, 7]))
    ds = ds.drop_vars('SCL')

    # 5. Compute NDVI
    ndvi = (ds.B8A - ds.B04_20) / (ds.B8A + ds.B04_20)
    ndvi = ndvi.where(~np.isinf(ndvi), np.nan)
    ndvi = ndvi.sortby('time')
    ndvi = ndvi.compute()

    # 5. Monitor, log date, report, and save
    new_dates_processed = False
    for date in ndvi.time.values.astype('M8[s]').astype(datetime.datetime):
        # Convert numpy.datetime64 to python datetime for logging
        date_str = date.strftime('%Y-%m-%d')
        ndvi_slice = ndvi.sel(time=date, method='nearest').values
        if not np.isnan(ndvi_slice).all():
            model.monitor(array=ndvi_slice, date=date)
            # Log the date immediately after it has been processed
            with open(DATE_LOG_FILE, 'a') as log:
                log.write(date_str + "\n")
            new_dates_processed = True

    # 6. Only update reports and state if new data was actually processed
    if new_dates_processed:
        model.report(filename=REPORT_FILE, layers=['mask', 'detection_date'], dtype=np.uint8)
        model.to_netcdf(STATE_FILE) # Overwrite the state file with the updated version
        print("Monitoring update complete.")
    else:
        print("No new valid observations found to process.")

if __name__ == "__main__":
    run_daily_monitoring()

Automatic execution setup

A common approach for automating this script is using CRON to schedule its execution within a Docker container. This ensures a consistent and isolated environment. The specific method for scheduling and orchestration, however, is typically a decision for the IT specialist or system administrator managing the deployment.

  1. Dockerfile: Create a Dockerfile to build an image with all necessary dependencies.

FROM python:3.10-slim
WORKDIR /app
COPY . .
RUN pip install --no-cache-dir nrt odc-stac odc-geo pystac-client rioxarray xarray
CMD ["python", "operational_monitoring.py"]
  1. CRON Job: Add a line to your crontab to run the Docker container daily.

# Edit crontab with: crontab -e

# Run the monitoring script every day at 2:00 AM
0 2 * * * docker run --rm -v /path/to/your/data:/app/data your-nrt-image-name

This command tells CRON to run your Docker container at 2 AM. The -v flag mounts your local data directory into the container, allowing the script to read the state file and write the outputs.

Publish the output

Once you have your report products (e.g., disturbance mask, detection date), you’ll want to make them accessible.

  • The Web Map Service (WMS) protocol is a good option for serving the outputs as map layers that can be consumed by GIS software (QGIS, ArcGIS) or web clients.

  • If you adopted a tiling strategy, creating a VRT (Virtual Raster) is a convenient way to mosaic all tiles into a single virtual layer. This allows individual tiles to be updated without needing to rewrite the entire mosaic, making the update process much faster.

Considerations

Note

Logging: It is crucial to implement comprehensive logging in your operational scripts. This will help you debug issues when the script runs unattended.

Note

State Backups: The operational script overwrites the state file on each successful run. It’s wise to implement a backup strategy (e.g., renaming the old state file with a timestamp before saving the new one) to prevent a failed run from corrupting your only state file.

Note

Filesystem Stability: Depending on the architecture you are deploying on (e.g., HPC, cloud), writing directly to a distributed or network file system can be risky. An interrupted process could leave files in a corrupted or incomplete state. A more stable pattern is to write all outputs to a temporary local disk first (e.g., /scratch). Once all processing is complete, a single, atomic move operation can transfer the final products to the persistent storage.

Note

Parallel Processing with Dask: For large-scale deployments, the data loading strategy presented above may be inneficient. Dask provides a powerful solution for parallel and out-of-core (lazy) computation. When using odc.stac.stac_load, you can pass a chunks argument (e.g., chunks={‘x’: 1024, ‘y’: 1024}). This instructs stac_load to return a Dask array instead of a NumPy array, allowing data to be loaded, processed, and written in parallel blocks.