import logging
from goblin._compat import (
array_types, integer_types, float_types, string_types, add_metaclass)
from goblin import connection
from goblin.exceptions import (
ElementDefinitionException, GoblinQueryError, ValidationError)
from goblin.gremlin import GremlinMethod
from .element import Element, ElementMetaClass, edge_types
logger = logging.getLogger(__name__)
@add_metaclass(EdgeMetaClass)
[docs]class Edge(Element):
"""Base class for all edges."""
# __metaclass__ = EdgeMetaClass
__abstract__ = True
# if set to True, no more than one edge will
# be created between two vertices
__exclusive__ = False
label = None
gremlin_path = 'edge.groovy'
_save_edge = GremlinMethod()
_delete_edge = GremlinMethod()
_get_edges_between = GremlinMethod(classmethod=True)
_find_edge_by_value = GremlinMethod(classmethod=True)
FACTORY_CLASS = None
# edge id
# edge_id = columns.UUID(save_strategy=columns.SAVE_ONCE)
def __init__(self, outV, inV, **values):
"""
Initialize this edge with the outgoing and incoming vertices as well as
edge properties.
:param outV: The vertex this edge is coming out of
:type outV: Vertex
:param inV: The vertex this edge is going into
:type inV: Vertex
:param values: The properties for this edge
:type values: dict
"""
self._outV = outV
self._inV = inV
super(Edge, self).__init__(**values)
def __repr__(self):
return "{}(label={}, id={}, values={})".format(
self.__class__.__name__, self.__class__.get_label(),
getattr(self, '_id', None), getattr(self, '_values', {}))
def __getstate__(self):
state = {u'_id': self.id,
u'_type': u'edge',
u'_outV': str(self.outV().id),
u'_inV': str(self.inV().id),
u'_label': self.get_label(),
u'_properties': self.as_save_params()}
return state
def __setstate__(self, state):
data = self.translate_db_fields(state)
self.__init__(state['_outV'], state['_inV'], **data)
return self
@classmethod
[docs] def find_by_value(cls, field, value, as_dict=False, **kwargs):
"""
Returns edges that match the given field/value pair.
:param field: The field to search
:type field: str
:param value: The value of the field
:type value: str
:param as_dict: Return results as a dictionary
:type as_dict: boolean
:rtype: [goblin.models.Edge]
"""
_field = cls.get_property_by_name(field)
_label = cls.get_label()
value_type = False
if isinstance(value, integer_types + float_types):
value_type = True
future = connection.get_future(kwargs)
future_results = cls._find_edge_by_value(
value_type=value_type,
elabel=_label,
field=_field,
val=value
)
def by_value_handler(data):
if data is None:
data = []
if as_dict: # pragma: no cover
data = {v._id: v for v in data}
return data
def on_find_by_value(f):
try:
stream = f.result()
except Exception as e:
future.set_exception(e)
else:
stream.add_handler(by_value_handler)
future.set_result(stream)
future_results.add_done_callback(on_find_by_value)
return future
@classmethod
[docs] def all(cls, ids, as_dict=False, *args, **kwargs):
"""
Load all edges with the given edge_ids from the graph. By default this
will return a list of edges but if as_dict is True then it will return
a dictionary containing edge_ids as keys and edges found as values.
:param ids: A list of titan IDs
:type ids: list
:param as_dict: Toggle whether to return a dictionary or list
:type as_dict: boolean
:rtype: dict | list
"""
if not isinstance(ids, array_types):
raise GoblinQueryError("ids must be of type list or tuple")
# strids = [str(i) for i in ids]
def edge_handler(results):
try:
results = list(filter(None, results))
except TypeError:
raise cls.DoesNotExist
if len(results) != len(ids):
raise GoblinQueryError(
"the number of results don't match the number of edge " +
"ids requested")
objects = []
for r in results:
try:
objects += [Element.deserialize(r)]
except KeyError: # pragma: no cover
raise GoblinQueryError('Edge type "%s" is unknown' % '')
if as_dict: # pragma: no cover
return {e._id: e for e in objects}
return objects
return connection.execute_query("g.E(*eids)",
bindings={'eids': ids},
handler=edge_handler,
**kwargs)
@classmethod
[docs] def get_label(cls):
"""
Returns the label for this edge.
:rtype: str
"""
return cls._type_name(cls.label)
@classmethod
[docs] def get_between(cls, outV, inV, page_num=None, per_page=None):
"""
Return all the edges with a given label between two vertices.
:param outV: The vertex the edge comes out of.
:type outV: Vertex
:param inV: The vertex the edge goes into.
:type inV: Vertex
:param page_num: The page number of the results
:type page_num: int
:param per_page: The number of results per page
:type per_page: int
:rtype: list
"""
return cls._get_edges_between(out_v=outV,
in_v=inV,
elabel=cls.get_label(),
page_num=page_num,
per_page=per_page)
[docs] def validate(self):
"""
Perform validation of this edge raising a ValidationError if any
problems are encountered.
"""
if self._id is None:
if self._inV is None:
raise ValidationError(
'in vertex must be set before saving new edges')
if self._outV is None:
raise ValidationError(
'out vertex must be set before saving new edges')
super(Edge, self).validate()
[docs] def save(self, *args, **kwargs):
"""
Save this edge to the graph database.
"""
super(Edge, self).save()
future = connection.get_future(kwargs)
future_result = self._save_edge(self._outV,
self._inV,
self.get_label(),
self.as_save_params(),
exclusive=self.__exclusive__,
**kwargs)
def on_read(f2):
try:
result = f2.result()[0]
except Exception as e:
future.set_exception(e)
else:
future.set_result(result)
def on_save(f):
try:
stream = f.result()
except Exception as e:
future.set_exception(e)
else:
future_read = stream.read()
future_read.add_done_callback(on_read)
future_result.add_done_callback(on_save)
return future
def _reload_values(self, *args, **kwargs):
""" Re-read the values for this edge from the graph database. """
reloaded_values = {}
future = connection.get_future(kwargs)
future_result = connection.execute_query(
'g.E(eid)', {'eid': self._id}, **kwargs)
def on_read(f2):
try:
result = f2.result()
result = result.data[0]
except Exception as e:
future.set_exception(e)
else:
if result:
# del result['type']
reloaded_values['id'] = result['id']
for name, value in result.get('properties', {}).items():
reloaded_values[name] = value
if result['id']:
setattr(self, 'id', result['id'])
future.set_result(reloaded_values)
else:
future.set_result({})
def on_reload(f):
try:
stream = f.result()
except Exception as e:
future.set_exception(e)
else:
future_read = stream.read()
future_read.add_done_callback(on_read)
future_result.add_done_callback(on_reload)
return future
@classmethod
[docs] def get(cls, id, *args, **kwargs):
"""
Look up edge by titan assigned ID. Raises a DoesNotExist exception if
an edge with the given edge id was not found. Raises a
MultipleObjectsReturned exception if the edge_id corresponds to more
than one edge in the graph.
:param id: The titan assigned ID
:type id: str | basestring
:rtype: goblin.models.Edge
"""
if not id:
raise cls.DoesNotExist
future = connection.get_future(kwargs)
future_result = cls.all([id], **kwargs)
def on_read(f2):
try:
result = f2.result()
except Exception as e:
future.set_exception(e)
else:
if len(result) > 1: # pragma: no cover
# This requires titan to be broken.
e = cls.MultipleObjectsReturned
future.set_exception(e)
else:
result = result[0]
if not isinstance(result, cls):
e = cls.WrongElementType(
'%s is not an instance or subclass of %s' % (
result.__class__.__name__, cls.__name__))
future.set_exception(e)
else:
future.set_result(result)
def on_get(f):
try:
stream = f.result()
except Exception as e:
future.set_exception(e)
else:
future_read = stream.read()
future_read.add_done_callback(on_read)
future_result.add_done_callback(on_get)
return future
@classmethod
[docs] def create(cls, outV, inV, label=None, *args, **kwargs):
"""
Create a new edge of the current type coming out of vertex outV and
going into vertex inV with the given properties.
:param outV: The vertex the edge is coming out of
:type outV: Vertex
:param inV: The vertex the edge is going into
:type inV: Vertex
"""
edge = super(Edge, cls).create(outV, inV, *args, **kwargs)
return edge
[docs] def delete(self, **kwargs):
"""
Delete the current edge from the graph.
"""
if self.__abstract__: # pragma: no cover
raise GoblinQueryError('cant delete abstract elements')
if self._id is None:
return self
future = connection.get_future(kwargs)
future_result = self._delete_edge()
def on_read(f2):
try:
result = f2.result()
except Exception as e:
future.set_exception(e)
else:
future.set_result(result)
def on_delete(f):
try:
stream = f.result()
except Exception as e:
future.set_exception(e)
else:
future_read = stream.read()
future_read.add_done_callback(on_read)
future_result.add_done_callback(on_delete)
return future
def _simple_traversal(self, operation, *args, **kwargs):
"""
Perform a simple traversal starting from the current edge returning a
list of results.
:param operation: The operation to be performed
:type operation: str
:rtype: list
"""
deserialize = kwargs.pop('deserialize', True)
def edge_traversal_handler(data):
if deserialize:
data = [Element.deserialize(d) for d in data]
return data
future_results = connection.execute_query(
'g.e(id).%s()' % operation, {'id': self.id},
handler=edge_traversal_handler, **kwargs)
return
[docs] def inV(self, *args, **kwargs):
"""
Return the vertex that this edge goes into.
:rtype: Vertex
"""
from goblin.models.vertex import Vertex
future = connection.get_future(kwargs)
if self._inV is None:
future_results = self._simple_traversal('inV', **kwargs)
def on_traversal(f):
try:
result = f.result()
except Exception as e:
future.set_exception(e)
else:
self._inV = result[0]
if isinstance(self._inV, string_types + integer_types):
future_results = Vertex.get(self._inV, **kwargs)
def on_get(f2):
try:
result = f2.result()
except Exception as e:
future.set_exception(e)
else:
self._inV = result
future_results.add_done_callback(on_get)
future.set_result(self._inV)
future_results.add_done_callback(on_traversal)
elif isinstance(self._inV, string_types + integer_types):
future_results = Vertex.get(self._inV, **kwargs)
def on_get(f2):
try:
result = f2.result()
except Exception as e:
future.set_exception(e)
else:
self._inV = result
future.set_result(self._inV)
future_results.add_done_callback(on_get)
else:
future.set_result(self._inV)
return future
[docs] def outV(self, *args, **kwargs):
"""
Return the vertex that this edge goes into.
:rtype: Vertex
"""
from goblin.models.vertex import Vertex
future = connection.get_future(kwargs)
if self._inV is None:
future_results = self._simple_traversal('outV', **kwargs)
def on_traversal(f):
try:
result = f.result()
except Exception as e:
future.set_exception(e)
else:
self._outV = result[0]
if isinstance(self._outV, string_types + integer_types):
future_results = Vertex.get(self._outV, **kwargs)
def on_get(f2):
try:
result = f2.result()
except Exception as e:
future.set_exception(e)
else:
self._outV = result
future_results.add_done_callback(on_get)
future.set_result(self._outV)
future_results.add_done_callback(on_traversal)
elif isinstance(self._outV, string_types + integer_types):
future_results = Vertex.get(self._outV, **kwargs)
def on_get(f2):
try:
result = f2.result()
except Exception as e:
future.set_exception(e)
else:
self._outV = result
future.set_result(self._outV)
future_results.add_done_callback(on_get)
else:
future.set_result(self._outV)
return future