Skip to content

Instantly share code, notes, and snippets.

@bryanluman
Created August 5, 2014 20:52
Show Gist options
  • Select an option

  • Save bryanluman/e7d782a75c34e8d2b561 to your computer and use it in GitHub Desktop.

Select an option

Save bryanluman/e7d782a75c34e8d2b561 to your computer and use it in GitHub Desktop.
Mini-implementation of ArcSDE I made for ArcGIS 9 when ArcPy really sucked
class FeatureClass(object):
def __init__(self, conn, database, schema, table_name):
self.conn = conn
self.database = database
self.schema = schema
self.table_name = table_name
self._set_registration_id()
self._set_geometry_table()
@property
def sql_name(self):
return ".".join([self.database, self.schema, self.table_name])
@property
def geometry_sql_name(self):
return ".".join([self.database, self.schema, self.geometry_table_name])
@property
def adds_table(self):
return "%s.%s.a%d" % (self.database, self.schema, self.registration_id)
@property
def deletes_table(self):
return "%s.%s.D%d" % (self.database, self.schema, self.registration_id)
def _set_registration_id(self):
cursor = self.conn.cursor()
cursor.execute("""SELECT registration_id
FROM sde.SDE_table_registry
WHERE database_name = '%s'
AND table_name = '%s'""" % (self.database, self.table_name))
row = cursor.fetchone()
self.registration_id = row.registration_id
def _set_geometry_table(self):
cursor = self.conn.cursor()
cursor.execute("""SELECT f_geometry_column, g_table_name, f_table_schema
FROM sde.sde.SDE_geometry_columns
WHERE f_table_catalog = '%s'
AND f_table_name = '%s'""" % (self.database, self.table_name))
row = cursor.fetchone()
self.geometry_table_name = row.g_table_name
def get_textanno_in_extent(self, extent, version):
cursor = self.conn.cursor()
shape_ids = self._shape_ids_in_extent(extent)
if len(shape_ids) == 0:
return []
default_text = self._textvalues_in_default_version_matching_shapes(shape_ids)
text_pairs = {}
if version.name == 'DEFAULT':
for row in default_text:
text_pairs[row.TEXTVALUE] = row.TEXTVALUE2
else:
deleted_ids = self._deleted_object_ids(version)
added_text = self._added_textvalues_matching_shapes(shape_ids, version)
for row in default_text:
if not row.OBJECTID in deleted_ids:
text_pairs[row.TEXTVALUE] = row.TEXTVALUE2
for row in added_text:
text_pairs[row.TEXTVALUE] = row.TEXTVALUE2
text_list = [ [k,text_pairs[k]] for k in text_pairs.keys() ]
return text_list
def object_ids_in_extent(self, extent, version):
cursor = self.conn.cursor()
shape_ids = self._shape_ids_in_extent(extent)
if len(shape_ids) == 0:
return []
default_ids = self._ids_in_default_version_matching_shapes(shape_ids)
if version.name == 'DEFAULT':
return default_ids
else:
deleted_ids = self._deleted_object_ids(version)
added_ids = self._added_object_ids_matching_shapes(shape_ids, version)
ids_in_version = (set(default_ids) - set(deleted_ids)).union(set(added_ids))
return list(ids_in_version)
def _ids_in_default_version_matching_shapes(self, shape_ids):
cursor = self.conn.cursor()
shape_ids_sql = ", ".join(map(str, set(shape_ids)))
cursor.execute("""SELECT DISTINCT(OBJECTID)
FROM %s
WHERE Shape IN (%s)""" % (self.sql_name, shape_ids_sql))
return [ row.OBJECTID for row in cursor.fetchall() ]
def _textvalues_in_default_version_matching_shapes(self, shape_ids):
cursor = self.conn.cursor()
shape_ids_sql = ", ".join(map(str, set(shape_ids)))
cursor.execute("""SELECT OBJECTID, TEXTVALUE, TEXTVALUE2
FROM %s
WHERE Shape IN (%s)
ORDER BY TEXTVALUE""" % (self.sql_name, shape_ids_sql))
return [ row for row in cursor.fetchall() ]
def _shape_ids_in_extent(self, extent):
cursor = self.conn.cursor()
cursor.execute(""" SELECT fid
FROM %s
WHERE eminx > %s
AND eminy > %s
AND emaxx < %s
AND emaxy < %s""" % (self.geometry_sql_name, extent.xmin,
extent.ymin, extent.xmax, extent.ymax))
return [ row.fid for row in cursor.fetchall() ]
def _deleted_object_ids(self, version):
cursor = self.conn.cursor()
cursor.execute("""SELECT DISTINCT(SDE_DELETES_ROW_ID)
FROM %s
WHERE DELETED_AT = '%d'""" % (self.deletes_table, version.state_id))
return [ row[0] for row in cursor.fetchall() ]
def _added_object_ids_matching_shapes(self, shape_ids, version):
cursor = self.conn.cursor()
shape_ids_sql = ", ".join(map(str, set(shape_ids)))
cursor.execute("""SELECT OBJECTID, Shape
FROM %s
WHERE Shape in (%s)
AND SDE_STATE_ID = %d""" % (self.adds_table, shape_ids_sql, version.state_id) )
return [ row[0] for row in cursor.fetchall() ]
def _added_textvalues_matching_shapes(self, shape_ids, version):
cursor = self.conn.cursor()
shape_ids_sql = ", ".join(map(str, set(shape_ids)))
cursor.execute("""SELECT OBJECTID, TEXTVALUE, TEXTVALUE2, Shape
FROM %s
WHERE Shape in (%s)
AND SDE_STATE_ID = %d
ORDER BY TEXTVALUE""" % (self.adds_table, shape_ids_sql, version.state_id) )
return [ row for row in cursor.fetchall() ]
class Version(object):
def __init__(self, conn, name):
self.conn = conn
self.name = name
self._set_state_id()
def _set_state_id(self):
cursor = self.conn.cursor()
cursor.execute("SELECT state_id FROM sde.SDE_versions WHERE name = '%s'" % (self.name))
row = cursor.fetchone()
self.state_id = row.state_id
@bryanluman
Copy link
Copy Markdown
Author

Wrote this years ago, slightly proud, slightly horrified.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment