"""Main OGM API classes and constructors"""
import asyncio
import collections
import logging
import weakref
import aiogremlin
from aiogremlin.driver.protocol import Message
from aiogremlin.driver.resultset import ResultSet
from aiogremlin.process.graph_traversal import __
from gremlin_python.driver.remote_connection import RemoteTraversal
from gremlin_python.process.traversal import Binding, Cardinality, Traverser
from gremlin_python.structure.graph import Edge, Vertex
from goblin import exception, mapper
from goblin.element import GenericEdge, GenericVertex, VertexProperty
from goblin.manager import VertexPropertyManager
logger = logging.getLogger(__name__)
[docs]def bindprop(element_class, ogm_name, val, *, binding=None):
"""
Helper function for binding ogm properties/values to corresponding db
properties/values for traversals.
:param goblin.element.Element element_class: User defined element class
:param str ogm_name: Name of property as defined in the ogm
:param val: The property value
:param str binding: The binding for val (optional)
:returns: tuple object ('db_property_name', ('binding(if passed)', val))
"""
db_name = getattr(element_class, ogm_name, ogm_name)
_, data_type = element_class.__mapping__.ogm_properties[ogm_name]
val = data_type.to_db(val)
if binding:
val = (binding, val)
return db_name, val
[docs]class Session:
"""
Provides the main API for interacting with the database. Does not
necessarily correpsond to a database session. Don't instantiate directly,
instead use :py:meth:`Goblin.session<goblin.app.Goblin.session>`.
:param goblin.app.Goblin app:
:param aiogremlin.driver.connection.Connection conn:
"""
def __init__(self, app, remote_connection, get_hashable_id):
self._app = app
self._remote_connection = remote_connection
self._loop = self._app._loop
self._use_session = False
self._pending = collections.deque()
self._current = dict()
self._get_hashable_id = get_hashable_id
self._graph = aiogremlin.Graph()
@property
def graph(self):
return self._graph
@property
def app(self):
return self._app
@property
def remote_connection(self):
return self._remote_connection
@property
def current(self):
return self._current
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
self.close()
[docs] def close(self):
"""
"""
self._remote_connection = None
self._app = None
# Traversal API
@property
def g(self):
"""
Get a simple traversal source.
:returns:
`gremlin_python.process.GraphTraversalSource`
object
"""
return self.traversal()
@property
def _g(self):
"""
Traversal source for internal use. Uses undelying conn. Doesn't
trigger complex deserailization.
"""
return self.graph.traversal().withRemote(self.remote_connection)
[docs] def traversal(self, element_class=None):
"""
Generate a traversal using a user defined element class as a
starting point.
:param goblin.element.Element element_class: An optional element
class that will dictate the element type (vertex/edge) as well as
the label for the traversal source
:returns: `aiogremlin.process.graph_traversal.AsyncGraphTraversal`
"""
traversal = self.graph.traversal().withRemote(self)
if element_class:
label = element_class.__mapping__.label
if element_class.__type__ == 'vertex':
traversal = traversal.V()
if element_class.__type__ == 'edge':
traversal = traversal.E()
traversal = traversal.hasLabel(label)
return traversal
[docs] async def submit(self, bytecode):
"""
Submit a query to the Gremiln Server.
:param str gremlin: Gremlin script to submit to server.
:param dict bindings: A mapping of bindings for Gremlin script.
:returns:
`gremlin_python.driver.remove_connection.RemoteTraversal`
object
"""
await self.flush()
remote_traversal = await self.remote_connection.submit(bytecode)
traversers = remote_traversal.traversers
side_effects = remote_traversal.side_effects
result_set = ResultSet(traversers.request_id, traversers._timeout,
self._loop)
self._loop.create_task(self._receive(traversers, result_set))
return RemoteTraversal(result_set, side_effects)
async def _receive(self, traversers, result_set):
try:
async for result in traversers:
result = await self._deserialize_result(result)
msg = Message(200, result, '')
result_set.queue_result(msg)
except Exception as e:
msg = Message(500, None, e.args[0])
result_set.queue_result(msg)
finally:
result_set.queue_result(None)
async def _deserialize_result(self, result):
if isinstance(result, Traverser):
bulk = result.bulk
obj = result.object
if isinstance(obj, (Vertex, Edge)):
hashable_id = self._get_hashable_id(obj.id)
current = self.current.get(hashable_id, None)
if isinstance(obj, Vertex):
# why doesn't this come in on the vertex?
label = await self._g.V(obj.id).label().next()
if not current:
current = self.app.vertices.get(label, GenericVertex)()
props = await self._get_vertex_properties(obj.id, label)
if isinstance(obj, Edge):
props = await self._g.E(obj.id).valueMap(True).next()
if not current:
current = self.app.edges.get(
props.get('label'), GenericEdge)()
current.source = GenericVertex()
current.target = GenericVertex()
element = current.__mapping__.mapper_func(obj, props, current)
self.current[hashable_id] = element
return Traverser(element, bulk)
else:
return result
# Recursive serialization is broken in goblin
elif isinstance(result, dict):
for key in result:
result[key] = self._deserialize_result(result[key])
return result
elif isinstance(result, list):
return [self._deserialize_result(item) for item in result]
else:
return result
async def _get_vertex_properties(self, vid, label):
projection = self._g.V(vid).properties() \
.project('id', 'key', 'value', 'meta') \
.by(__.id()).by(__.key()).by(__.value()) \
.by(__.valueMap())
props = await projection.toList()
new_props = {'label': label, 'id': vid}
for prop in props:
key = prop['key']
val = prop['value']
# print('val_type', type(val))
meta = prop['meta']
new_props.setdefault(key, [])
if meta:
meta['key'] = key
meta['value'] = val
meta['id'] = prop['id']
val = meta
new_props[key].append(val)
return new_props
# Creation API
[docs] def add(self, *elements):
"""
Add elements to session pending queue.
:param goblin.element.Element elements: Elements to be added
"""
for elem in elements:
self._pending.append(elem)
[docs] async def flush(self):
"""
Issue creation/update queries to database for all elements in the
session pending queue.
"""
while self._pending:
elem = self._pending.popleft()
await self.save(elem)
[docs] async def remove_vertex(self, vertex):
"""
Remove a vertex from the db.
:param goblin.element.Vertex vertex: Vertex to be removed
"""
traversal = self._g.V(Binding('vid', vertex.id)).drop()
result = await self._simple_traversal(traversal, vertex)
hashable_id = self._get_hashable_id(vertex.id)
if hashable_id in self.current:
vertex = self.current.pop(hashable_id)
else:
msg = 'Vertex {} does not belong to this session obj {}'.format(
vertex, self)
logger.warning(msg)
del vertex
return result
[docs] async def remove_edge(self, edge):
"""
Remove an edge from the db.
:param goblin.element.Edge edge: Element to be removed
"""
eid = edge.id
if isinstance(eid, dict):
eid = Binding('eid', edge.id)
traversal = self._g.E(eid).drop()
result = await self._simple_traversal(traversal, edge)
hashable_id = self._get_hashable_id(edge.id)
if hashable_id in self.current:
edge = self.current.pop(hashable_id)
else:
msg = 'Edge {} does not belong to this session obj {}'.format(
edge, self)
logger.warning(msg)
del edge
return result
[docs] async def save(self, elem):
"""
Save an element to the db.
:param goblin.element.Element element: Vertex or Edge to be saved
:returns: :py:class:`Element<goblin.element.Element>` object
"""
if elem.__type__ == 'vertex':
result = await self.save_vertex(elem)
elif elem.__type__ == 'edge':
result = await self.save_edge(elem)
else:
raise exception.ElementError("Unknown element type: {}".format(
elem.__type__))
return result
[docs] async def save_vertex(self, vertex):
"""
Save a vertex to the db.
:param goblin.element.Vertex element: Vertex to be saved
:returns: :py:class:`Vertex<goblin.element.Vertex>` object
"""
result = await self._save_element(
vertex, self._check_vertex, self._add_vertex, self._update_vertex)
hashable_id = self._get_hashable_id(result.id)
self.current[hashable_id] = result
return result
[docs] async def save_edge(self, edge):
"""
Save an edge to the db.
:param goblin.element.Edge element: Edge to be saved
:returns: :py:class:`Edge<goblin.element.Edge>` object
"""
if not (hasattr(edge, 'source') and hasattr(edge, 'target')):
raise exception.ElementError(
"Edges require both source/target vertices")
result = await self._save_element(edge, self._check_edge,
self._add_edge, self._update_edge)
hashable_id = self._get_hashable_id(result.id)
self.current[hashable_id] = result
return result
[docs] async def get_vertex(self, vertex):
"""
Get a vertex from the db. Vertex must have id.
:param goblin.element.Vertex element: Vertex to be retrieved
:returns: :py:class:`Vertex<goblin.element.Vertex>` | None
"""
return await self.g.V(Binding('vid', vertex.id)).next()
[docs] async def get_edge(self, edge):
"""
Get a edge from the db. Edge must have id.
:param goblin.element.Edge element: Edge to be retrieved
:returns: :py:class:`Edge<goblin.element.Edge>` | None
"""
eid = edge.id
if isinstance(eid, dict):
eid = Binding('eid', edge.id)
return await self.g.E(eid).next()
async def _update_vertex(self, vertex):
"""
Update a vertex, generally to change/remove property values.
:param goblin.element.Vertex vertex: Vertex to be updated
:returns: :py:class:`Vertex<goblin.element.Vertex>` object
"""
props = mapper.map_props_to_db(vertex, vertex.__mapping__)
traversal = self._g.V(Binding('vid', vertex.id))
return await self._update_vertex_properties(vertex, traversal, props)
async def _update_edge(self, edge):
"""
Update an edge, generally to change/remove property values.
:param goblin.element.Edge edge: Edge to be updated
:returns: :py:class:`Edge<goblin.element.Edge>` object
"""
props = mapper.map_props_to_db(edge, edge.__mapping__)
eid = edge.id
if isinstance(eid, dict):
eid = Binding('eid', edge.id)
traversal = self._g.E(eid)
return await self._update_edge_properties(edge, traversal, props)
# *metodos especiales privados for creation API
async def _simple_traversal(self, traversal, element):
elem = await traversal.next()
if elem:
if element.__type__ == 'vertex':
# Look into this
label = await self._g.V(elem.id).label().next()
props = await self._get_vertex_properties(elem.id, label)
elif element.__type__ == 'edge':
props = await self._g.E(elem.id).valueMap(True).next()
elem = element.__mapping__.mapper_func(elem, props, element)
return elem
async def _save_element(self, elem, check_func, create_func, update_func):
if hasattr(elem, 'id'):
exists = await check_func(elem)
if not exists:
result = await create_func(elem)
else:
result = await update_func(elem)
else:
result = await create_func(elem)
return result
async def _add_vertex(self, vertex):
"""Convenience function for generating crud traversals."""
props = mapper.map_props_to_db(vertex, vertex.__mapping__)
traversal = self._g.addV(vertex.__mapping__.label)
return await self._add_properties(traversal, props, vertex)
async def _add_edge(self, edge):
"""Convenience function for generating crud traversals."""
props = mapper.map_props_to_db(edge, edge.__mapping__)
traversal = self._g.V(Binding('sid', edge.source.id))
traversal = traversal.addE(edge.__mapping__._label)
traversal = traversal.to(__.V(Binding('tid', edge.target.id)))
return await self._add_properties(traversal, props, edge)
async def _check_vertex(self, vertex):
"""Used to check for existence, does not update session vertex"""
msg = await self._g.V(Binding('vid', vertex.id)).next()
return msg
async def _check_edge(self, edge):
"""Used to check for existence, does not update session edge"""
eid = edge.id
if isinstance(eid, dict):
eid = Binding('eid', edge.id)
return await self._g.E(eid).next()
async def _update_vertex_properties(self, vertex, traversal, props):
await self._g.V(vertex.id).properties().drop().iterate()
return await self._add_properties(traversal, props, vertex)
async def _update_edge_properties(self, edge, traversal, props):
await self._g.E(edge.id).properties().drop().iterate()
return await self._add_properties(traversal, props, edge)
async def _add_properties(self, traversal, props, elem):
binding = 0
for card, db_name, val, metaprops in props:
if not metaprops:
metaprops = {}
if val is not None:
key = ('k' + str(binding), db_name)
val = ('v' + str(binding), val)
if card:
# Maybe use a dict here as a translator
if card == Cardinality.list_:
card = Cardinality.list_
elif card == Cardinality.set_:
card = Cardinality.set_
else:
card = Cardinality.single
metas = [
j
for i in zip(metaprops.keys(), metaprops.values())
for j in i
]
traversal = traversal.property(card, key, val, *metas)
else:
metas = [
j
for i in zip(metaprops.keys(), metaprops.values())
for j in i
]
traversal = traversal.property(key, val, *metas)
binding += 1
return await self._simple_traversal(traversal, elem)