I ran it on the Blue River Watershed in Oregon and it took it about 5 days to complete. The input DEM was 7084x5882 pixels and the search kernel looked out 2,500m. The main innovation relative to other implementations I have seen is I created a kernel with concentric rings to scan the landscape for maximum ridge angles within 8 horizontal direction classes. The example below has 50 concentric rings which are separated by 50m. The DEM was at a 5m resolution so I went with a relatively fine spacing for the concentric rings.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#------------------------------------------------------------------------------- | |
# Name: Calc_TOPEX.py | |
# Purpose: This program will calculate the vertical angle for a given direction and distance | |
# on a given DEM. The maximum angle is calculated for each of 8 directions and summed. A greater angle | |
# equates to less exposure. | |
# | |
# Author: olsenk | |
# | |
# Created: 17/02/2014 | |
# Copyright: (c) olsenk 2014 | |
# Licence: <your licence> | |
#------------------------------------------------------------------------------- | |
#!/usr/bin/env python | |
import arcpy | |
import numpy as np | |
#from scipy.ndimage import filters | |
from math import atan2, atan, degrees | |
nodataVal = -999 | |
# funciton to construct a circular kernal from a given radius | |
def buildAzimuthKernel(radiusMeters, referenceDesc): | |
kernel = [] | |
# build kernel from radius and input raster cell size | |
window = int(radiusMeters / referenceDesc.meanCellHeight) | |
for row in range(-window, window + 1): | |
for col in range(-window, window + 1): | |
Xdist = abs(col * referenceDesc.meanCellHeight) | |
Ydist = abs(row * referenceDesc.meanCellHeight) | |
totalDist = pow(Xdist**2 + Ydist**2, 0.5) | |
if (totalDist > 0) and (totalDist <= radiusMeters) and (totalDist % 50 == 0): | |
angleDeg = degrees(atan2(col,row)) | |
if (angleDeg > -22.5 and angleDeg < 22.5): | |
# north | |
azClass = 0 | |
elif (angleDeg >= 22.5 and angleDeg < 67.5): | |
# northeast | |
azClass = 1 | |
elif (angleDeg >= 67.5 and angleDeg < 112.5): | |
# east | |
azClass = 2 | |
elif (angleDeg >= 112.5 and angleDeg < 157.5): | |
# southeast | |
azClass = 3 | |
elif (angleDeg >= 157.5 or angleDeg < -157.5): | |
# south | |
azClass = 4 | |
elif (angleDeg >= -157.5 and angleDeg < -112.5): | |
# southwest | |
azClass = 5 | |
elif (angleDeg >= -112.5 and angleDeg < -67.5): | |
azClass = 6 | |
else: | |
azClass = 7 | |
kernel.append([row,col,int(totalDist + 0.5),azClass]) | |
print str(len(kernel)) + " pixels in kernel" | |
return(kernel) | |
# ------------------------------------------------------------------------------- | |
def main(): | |
demImage = "T:\\groups\\clib\\spatial\\HJA\\phy\\dem5m_merge_b" | |
outRasterName = "N:\\projects\\HJA\\topex\\hja_topex" | |
outTestRasterName = "N:\\projects\\HJA\\topex\\hja_topex_s" | |
logFile = open("N:\\projects\\HJA\\topex\\topexLog.txt", 'w') | |
# read DEM into memory | |
referenceDesc = arcpy.Describe(demImage) | |
lowerLeftCorner = arcpy.Point(referenceDesc.Extent.XMin + (referenceDesc.meanCellWidth / 2), referenceDesc.Extent.YMin + (referenceDesc.meanCellHeight / 2)) | |
myArray = arcpy.RasterToNumPyArray(demImage, lowerLeftCorner, referenceDesc.width, referenceDesc.height, nodataVal) | |
demImage = np.ma.masked_values(np.rint(myArray), nodataVal) | |
# set workspaces | |
arcpy.env.workspace = "c:\\temp" | |
arcpy.env.scratchWorkspace = "c:\\temp" | |
# pad DEM with nodataVal to avoid boundary issues | |
## pad = np.ma.zeros((500,referenceDesc.width), dtype=np.int32) | |
## demImage = np.ma.vstack((pad, demImage, pad)) | |
## referenceDesc.height += 1000 | |
## pad = np.ma.zeros((referenceDesc.height, 500), dtype=np.int32) | |
## demImage = np.ma.hstack((pad, demImage, pad)) | |
## referenceDesc.width += 1000 | |
kernel = buildAzimuthKernel(2500, referenceDesc) | |
# create empty image to hold topex values | |
topexImage = np.ma.zeros((referenceDesc.height, referenceDesc.width), dtype=np.float32) | |
topexImage.mask = demImage.mask | |
# run topex function over image | |
# NOTE: input DEM must have nodata buffer same width as kernel radius to avoid using boundary if statements (below) | |
## i = 0 | |
for y in xrange(3001,referenceDesc.height): | |
for x in xrange(referenceDesc.width): | |
if (demImage.data[y,x] != nodataVal): | |
exposure = [0,0,0,0,0,0,0,0] | |
for pixelOffset in kernel: | |
# if ((y + pixelOffset[0]) >= 0 and (y + pixelOffset[0]) < referenceDesc.height and (x + pixelOffset[1]) >= 0 and (x + pixelOffset[1]) < referenceDesc.width): | |
if (demImage.data[y + pixelOffset[0], x + pixelOffset[1]] != nodataVal): | |
exposeRatio = (demImage.data[y + pixelOffset[0],x + pixelOffset[1]] - demImage.data[y,x]) / (pixelOffset[2]) | |
if exposeRatio > exposure[pixelOffset[3]]: | |
exposure[pixelOffset[3]] = exposeRatio | |
# set topex values to sum of 8 direction exposure degree maximums | |
exposure = np.degrees(np.arctan(exposure)) | |
topexImage.data[y,x] = sum(exposure) | |
# testImage.data[y,x] = exposureAz[0] | |
print "Processing row: " + str(y) | |
logFile.write("Processing row: " + str(y) + "\n") | |
## if (y % 1000 == 0): | |
## i += 1 | |
## outRasterNameNew = outRasterName + str(i) | |
## try: | |
## lowerLeftCorner = arcpy.Point(referenceDesc.Extent.XMin, referenceDesc.Extent.YMin) | |
## logFile.write("Saving numpy array to raster " + str(i) + "\n") | |
## rasterToSave = arcpy.NumPyArrayToRaster(topexImage.filled(nodataVal), lowerLeftCorner, referenceDesc.meanCellWidth, referenceDesc.meanCellHeight, nodataVal) | |
## rasterToSave.save(outRasterNameNew) | |
## | |
## # define the projection based on the reference raster | |
## arcpy.DefineProjection_management(outRasterNameNew, referenceDesc.spatialReference) | |
## except Exception, e: | |
## print "\n\n" + e.args[0] | |
## | |
## except: | |
## print "unhandled Error!!" | |
# convert the masked array to normal numpy array and fill the masked area with nodataVal | |
try: | |
logFile.write("Saving numpy array to raster\n") | |
rasterToSave = arcpy.NumPyArrayToRaster(topexImage.filled(nodataVal), lowerLeftCorner, referenceDesc.meanCellWidth, referenceDesc.meanCellHeight, nodataVal) | |
rasterToSave.save(outRasterName) | |
# define the projection based on the reference raster | |
arcpy.DefineProjection_management(outRasterName, referenceDesc.spatialReference) | |
# arcpy.BuildRasterAttributeTable_management(rasterOutputName, "Overwrite") | |
except Exception, e: | |
print "\n\n" + e.args[0] | |
except: | |
print "unhandled Error!!" | |
logFile.close() | |
print "done." | |
if __name__ == '__main__': | |
main() |