Source code for ultipa.utils.format

import copy
import json
from typing import List

from ultipa.proto.ultipa_pb2 import AttrListData
from ultipa.structs import Property
from ultipa.types.types import ResultType
from ultipa.utils import errors
from ultipa.types import ULTIPA, ULTIPA_RESPONSE, ULTIPA_REQUEST
from ultipa.utils.errors import ParameterException
from ultipa.utils.serialize import _Serialize
from ultipa.utils.typeCheck import TypeCheck


[docs] class HasDataMap: has_ultipa_data: bool has_attr_data: bool only_attr_list: bool def __init__(self, has_ultipa_data, has_attr_data, only_attr_list): self.has_ultipa_data = has_ultipa_data self.has_attr_data = has_attr_data self.only_attr_list = only_attr_list
[docs] class FormatResponse:
[docs] @staticmethod def resTableToArray(table: ULTIPA.Table): tres = [] headers = [] if table: try: for index, header in enumerate(table.headers): if not header.property_name: if header.property_type == ULTIPA.PropertyType.PROPERTY_NULL: header.property_name = f"null({index})" headers.append({"property_name": header.property_name, "property_type": Property._getStringByPropertyType( header.property_type)}) tret = ULTIPA.Table(table_name=table.name, headers=headers, table_rows=table.rows) tres.append(tret) except Exception as e: raise errors.ParameterException(e) return tret
[docs] @staticmethod def formatStatisticsTable(table: ULTIPA.Table): headers = [] if table: try: for header in table.headers: headers.append({"property_name": header.property_name, "property_type": Property._getStringByPropertyType( header.property_type)}) item = {} for row in table.rows: for index in range(len(row)): v = row[index] item[table.headers[index].property_name] = int(v) tret = ULTIPA.UltipaStatistics(edge_affected=item.get("edge_affected"), node_affected=item.get("node_affected"), total_time_cost=item.get("total_time_cost"), engine_time_cost=item.get("engine_time_cost")) return tret except Exception as e: raise errors.ParameterException(e)
[docs] class DataMerge:
[docs] @staticmethod def arrayDiff(arr1: "[str]", arr2: "[str]"): diff = [] for i in arr2: if i not in arr1: diff.append(i) return diff
[docs] @staticmethod def customizer(objValue, srcValue, key, ignoreKeys): if ignoreKeys and key in ignoreKeys: return objValue if isinstance(objValue, list): return objValue + srcValue
[docs] @staticmethod def merge(o1: object, o2: object, ignoreKeys: [str] = None): if ignoreKeys is None: ignoreKeys = [] retDict = {} for key in o1: if o1.get(key) and o2.get(key): mergeRet = DataMerge.customizer(o1[key], o2[key], key, ignoreKeys) retDict.update({key: mergeRet}) else: if isinstance(o1, dict) and isinstance(o1, dict): if o1.get(key): retDict.update({key: o1.get(key)}) retDict.update(o2) return retDict
[docs] @staticmethod def concat(arr1: list, arr2: list, bykey: str = None, ignoreKeys: "[str]" = None): if bykey: if bykey == 'table_name': if isinstance(arr1, list) and isinstance(arr2, list): newData = arr1 newArry = arr2 # Find the different table_name of arr1 and arr2 if isinstance(newData, dict): diff = DataMerge.arrayDiff([i[bykey] for i in arr1], [i[bykey] for i in arr2]) else: diff = DataMerge.arrayDiff([y.name for y in newData], [y.name for y in arr2]) # If existing, add the table_name to newData if diff: for di in diff: for a2 in arr2: if a2.name == di: newData.append(ULTIPA.Table(table_name=di, table_rows=a2.rows, headers=a2.headers)) # Traverse newArry, add table to table_rows in accordance with table_name for a2 in newArry: for data in newData: if a2.name == data.name: data.rows += a2.rows return newData if bykey == 'nodes': if isinstance(arr1, list) and isinstance(arr2, list): newArry = arr2 newData = arr1 arrtList = [] for new in newData: arrtList.append(new.alias) for arr in newArry: if arr.alias not in arrtList: newData.append(arr) for arr in newArry: if not newData: newData.append(arr) for new in newData: if arr.alias == new.alias: new.nodes += arr.nodes return newData if bykey == 'edges': if isinstance(arr1, list) and isinstance(arr2, list): newArry = arr2 newData = arr1 arrtlist = [] for new in newData: arrtlist.append(new.alias) for arr in newArry: if arr.alias not in arrtlist: newData.append(arr) return newData for arr in newArry: if not newData: newData.append(arr) for new in newData: if arr.alias == new.alias: new.edges += arr.edges return newData if bykey == 'values': if isinstance(arr1, list) and isinstance(arr2, list): newArry = arr2 newData = arr1 arrtlist = [] for new in newData: arrtlist.append(new.alias) for arr in newArry: if arr.alias not in arrtlist: newData.append(arr) for arr in newArry: if not newData: newData.append(arr) for new in newData: if arr.alias in arrtlist and arr.alias == new.alias: new.values += arr.values return newData if bykey == 'arrays': if isinstance(arr1, list) and isinstance(arr2, list): newArry = arr2 newData = arr1 arrtlist = [] for new in newData: arrtlist.append(new.alias) for arr in newArry: if arr.alias not in arrtlist: newData.append(arr) for arr in newArry: if not newData: newData.append(arr) for new in newData: if arr.alias in arrtlist and arr.alias == new.alias: new.elements += arr.elements return newData if isinstance(arr1, list) and isinstance(arr2, list): arr1.extend(arr2) return arr1 elif isinstance(arr1, dict) and isinstance(arr2, dict): return arr1.update(arr2)
[docs] class FormatType:
[docs] @staticmethod def checkProperty(row, schemaProperties): propertyList = [] rowPropertyList = row.values.keys() for property in rowPropertyList: findSchema = list(filter(lambda x: x.get('name') == property, schemaProperties)) if findSchema: schemaProperty = findSchema[0] reqProperty = Property(schemaProperty.get("name")) reqProperty.setTypeInt(schemaProperty.get("type")) propertyList.append(reqProperty) else: raise ParameterException( err=f"row [{row._getIndex()}] error: schema[{row.schema}] doesn't contain property [{property}].") return propertyList
[docs] @staticmethod def checkNodeRow(i: int, schema: ULTIPA_REQUEST.Schema, row: ULTIPA.EntityRow): if row._index != None: i = row._index if row is None: raise ParameterException(err=f"The row [{i}] data is null") if row.values is None: raise ParameterException(err=f"node row [{i}] error: values are empty but properties size > 0.") if not schema.properties and row.values: raise ParameterException(err=f"node row [{i}] error: properties are empty but values size > 0.") if len(row.values.keys()) > len(schema.properties): raise ParameterException(err=f"node row [{i}] error: values size larger than properties size.") if len(row.values.keys()) < len(schema.properties): raise ParameterException(err=f"node row [{i}] error: values size smaller than properties size.") for proper in schema.properties: if proper.isIdType() or proper.isIgnore(): continue if proper.name not in row.values: raise ParameterException( err=f"node row [{i}] error: values doesn't contain property [{proper.name}].")
[docs] @staticmethod def checkEdgeRow(i: int, schema: ULTIPA_REQUEST.Schema, row: ULTIPA.EntityRow): if row._index != None: i = row._index if row is None: raise ParameterException(err=f"The row [{i}] data is null") if row.values is None: raise ParameterException(err=f"edge row [{i}] error: values are empty but properties size > 0.") if not schema.properties and row.values: raise ParameterException(err=f"edge row [{i}] error: properties are empty but values size > 0.") if len(row.values.keys()) > len(schema.properties): raise ParameterException(err=f"edge row [{i}] error: values size larger than properties size.") if len(row.values.keys()) < len(schema.properties): raise ParameterException(err=f"edge row [{i}] error: values size smaller than properties size.") if ((row.from_id is None or row.from_id == "") and (row.to_id is None or row.to_id == "")) \ and (row.from_uuid is None and row.to_uuid is None): raise ParameterException(err=f"row [{i}] error: _from/_from_uuid and _to/_to_uuid are null.") if (row.from_id != None and row.from_id != "") and (row.to_id is None or row.to_id == ""): raise ParameterException(err=f"row [{i}] error: _from has value [{row.from_id}] but _to got null.") if (row.from_id is None or row.from_id == "") and (row.to_id != None and row.to_id != ""): raise ParameterException(err=f"row [{i}] error: _to has value [{row.to_id}] but _from got null.") if row.from_uuid is None and row.to_uuid != None: raise ParameterException( err=f"row [{i}] error: _to_uuid has value [{row.to_uuid}] but _from_uuid got null.") if row.from_uuid != None and row.to_uuid is None: raise ParameterException( err=f"row [{i}] error: _from_uuid has value [{row.from_uuid}] but _to_uuid got null.") for proper in schema.properties: if proper.isIdType() or proper.isIgnore(): continue if proper.name not in row.values: raise ParameterException( err=f"edge row [{i}] error: values doesn't contain property [{proper.name}].")
[docs] @staticmethod def checkEntityNodeRow(i: int, schema: ULTIPA_REQUEST.Schema, row: ULTIPA.EntityRow): if row._index != None: i = row._index if row is None: raise ParameterException(err=f"The row [{i}] data is null") if row.values is None: raise ParameterException(err=f"node row [{i}] error: values are empty but properties size > 0.") if not schema.properties and row.values: raise ParameterException(err=f"node row [{i}] error: properties are empty but values size > 0.") if len(row.values.keys()) > len(schema.properties): raise ParameterException(err=f"node row [{i}] error: values size larger than properties size.") if len(row.values.keys()) < len(schema.properties): raise ParameterException(err=f"node row [{i}] error: values size smaller than properties size.") for proper in schema.properties: if proper.isIdType() or proper.isIgnore(): continue if proper.name not in row.values: raise ParameterException( err=f"node row [{i}] error: values doesn't contain property [{proper.name}].")
[docs] @staticmethod def checkEntityEdgeRow(i: int, schema: ULTIPA_REQUEST.Schema, row: ULTIPA.EntityRow): if row._index != None: i = row._index if row is None: raise ParameterException(err=f"The row [{i}] data is null") if row.values is None: raise ParameterException(err=f"edge row [{i}] error: values are empty but properties size > 0.") if not schema.properties and row.values: raise ParameterException(err=f"edge row [{i}] error: properties are empty but values size > 0.") if len(row.values.keys()) > len(schema.properties): raise ParameterException(err=f"edge row [{i}] error: values size larger than properties size.") if len(row.values.keys()) < len(schema.properties): raise ParameterException(err=f"edge row [{i}] error: values size smaller than properties size.") if ((row.from_id is None or row.from_id == "") and (row.to_id is None or row.to_id == "")) \ and (row.from_uuid is None and row.to_uuid is None): raise ParameterException(err=f"row [{i}] error: _from/_from_uuid and _to/_to_uuid are null.") if (row.from_id != None and row.from_id != "") and (row.to_id is None or row.to_id == ""): raise ParameterException(err=f"row [{i}] error: _from has value [{row.from_id}] but _to got null.") if (row.from_id is None or row.from_id == "") and (row.to_id != None and row.to_id != ""): raise ParameterException(err=f"row [{i}] error: _to has value [{row.to_id}] but _from got null.") if row.from_uuid is None and row.to_uuid != None: raise ParameterException( err=f"row [{i}] error: _to_uuid has value [{row.to_uuid}] but _from_uuid got null.") if row.from_uuid != None and row.to_uuid is None: raise ParameterException( err=f"row [{i}] error: _from_uuid has value [{row.from_uuid}] but _to_uuid got null.") for proper in schema.properties: if proper.isIdType() or proper.isIgnore(): continue if proper.name not in row.values: raise ParameterException( err=f"edge row [{i}] error: values doesn't contain property [{proper.name}].")
[docs] @staticmethod def status(_status, host: str = None) -> ULTIPA.Status: statusData = ULTIPA.Status(code=_status.error_code, message=_status.msg) isNotRaftMode = _status.error_code == ULTIPA.Code.NOT_RAFT_MODE if _status.cluster_info or isNotRaftMode: leaderPeer = None _followrs = [] if _status.cluster_info.leader_address or host: leaderPeer = ULTIPA.RaftPeerInfo(_status.cluster_info.leader_address or host, True, True, False, False, False) leaderPeer.isAlgoExecutable = isNotRaftMode or False _followrs.append(leaderPeer) for foll in _status.cluster_info.followers: status = foll.status == 1 peer = ULTIPA.RaftPeerInfo(foll.address) peer.status = status peer.isLeader = False peer.isAlgoExecutable = False peer.isFollowerReadable = False if status: peer.isAlgoExecutable = foll.role & ULTIPA.FollowerRole.ROLE_ALGO_EXECUTABLE and True or False peer.isFollowerReadable = foll.role & ULTIPA.FollowerRole.ROLE_READABLE and True or False peer.isUnset = foll.role == ULTIPA.FollowerRole.ROLE_UNSET _followrs.append(peer) clusterInfo = ULTIPA.ClusterInfo(_status.cluster_info.redirect, _followrs, leader=leaderPeer) statusData.clusterInfo = clusterInfo return statusData
[docs] @staticmethod def getRaftStatus(data: dict) -> ULTIPA.Status: statusData = ULTIPA.Status(code=data.get("code"), message=data.get("message")) clusterInfo = ULTIPA.ClusterInfo(redirect=data.get("redirectHost"), raftPeers=data.get("followersPeerInfos"), leader=data.get("leaderInfos")) statusData.clusterInfo = clusterInfo return statusData
[docs] @staticmethod def table(_table, timeZone, timeZoneOffset) -> ULTIPA.Table: table_name = _table.table_name headers = [] for h in _table.headers: headers.append(h) values = [] for index, row in enumerate(_table.table_rows): _vs = [] for index2, _v in enumerate(row.values): if len(headers) > 0: property_type = headers[index2].property_type else: property_type = ULTIPA.PropertyType.PROPERTY_STRING _seria = _Serialize(type=property_type, value=_v, timeZone=timeZone, timeZoneOffset=timeZoneOffset) unret = _seria.unserialize() _vs.append(unret) values.append(_vs) tableData = ULTIPA.Table(table_name=table_name, headers=headers, table_rows=values) return tableData
[docs] @staticmethod def tables(_tables, timeZone, timeZoneOffset) -> List[ULTIPA.Table]: tablesData = [] if _tables: for table in _tables: tableData = FormatType.table(table, timeZone, timeZoneOffset) tret = FormatResponse.resTableToArray(table=tableData) tablesData.append(tret) return tablesData
[docs] @staticmethod def statistics(_table, timeZone, timeZoneOffset) -> ULTIPA.UltipaStatistics: if not _table: return None tableData = FormatType.table(_table, timeZone, timeZoneOffset) tret = FormatResponse.formatStatisticsTable(table=tableData) return tret
[docs] @staticmethod def nodeTable(nodeTableData, timeZone, timeZoneOffset): SchemaTypeDict = {} SchemaHeaderDict = {} SchemaSubTypeHeaderDict = {} tables = [] for header in nodeTableData.node_table.schemas: schemaName = header.schema_name typesDict = {} headerDict = {} subTypesDict = {} for index, header in enumerate(header.properties): typesDict.update({index: header.property_type}) headerDict.update({index: header.property_name}) subTypesDict.update({index: header.sub_types}) SchemaTypeDict.update({schemaName: typesDict}) SchemaHeaderDict.update({schemaName: headerDict}) SchemaSubTypeHeaderDict.update({schemaName: subTypesDict}) schemaTypeGet = SchemaTypeDict.get schemaHeaderGet = SchemaHeaderDict.get schemaSubTypeGet = SchemaSubTypeHeaderDict.get for node_row in nodeTableData.node_table.entity_rows: if node_row.is_null is True: tables.append(None) continue data = {} _id = node_row.id uuid = node_row.uuid schema_name = node_row.schema_name if node_row.values: for index, uvalue in enumerate(node_row.values): seria = _Serialize(type=schemaTypeGet(schema_name).get(index), value=uvalue, subTypes=schemaSubTypeGet(schema_name).get(index), timeZone=timeZone, timeZoneOffset=timeZoneOffset) value = seria.unserialize() data.update({schemaHeaderGet(schema_name).get(index): value}) node = ULTIPA.Node(id=_id, values=data, schema_name=schema_name, uuid=uuid) tables.append(node) return tables
[docs] @staticmethod def edgeTable(edgeTableData, timeZone, timeZoneOffset): SchemaTypeDict = {} SchemaHeaderDict = {} SchemaSubTypeHeaderDict = {} tables = [] for header in edgeTableData.edge_table.schemas: schemaName = header.schema_name typesDict = {} headerDict = {} subTypesDict = {} for index, header in enumerate(header.properties): typesDict.update({index: header.property_type}) headerDict.update({index: header.property_name}) subTypesDict.update({index: header.sub_types}) SchemaTypeDict.update({schemaName: typesDict}) SchemaHeaderDict.update({schemaName: headerDict}) SchemaSubTypeHeaderDict.update({schemaName: subTypesDict}) schemaTypeGet = SchemaTypeDict.get schemaHeaderGet = SchemaHeaderDict.get schemaSubTypeGet = SchemaSubTypeHeaderDict.get for edge_row in edgeTableData.edge_table.entity_rows: if edge_row.is_null is True: tables.append(None) continue data = {} _from_uuid = edge_row.from_uuid _from_id = edge_row.from_id _to_uuid = edge_row.to_uuid _to_id = edge_row.to_id schema_name = edge_row.schema_name uuid = edge_row.uuid if edge_row.values: for index, uvalue in enumerate(edge_row.values): seria = _Serialize(type=schemaTypeGet(schema_name).get(index), value=uvalue, subTypes=schemaSubTypeGet(schema_name).get(index), timeZone=timeZone, timeZoneOffset=timeZoneOffset) value = seria.unserialize() data.update({schemaHeaderGet(schema_name).get(index): value}) edge = ULTIPA.Edge(from_id=_from_id, to_id=_to_id, values=data, uuid=uuid, from_uuid=_from_uuid, to_uuid=_to_uuid, schema_name=schema_name) tables.append(edge) return tables
[docs] @staticmethod def attrEntity(entityRows, type: str): entityList = [] for data in entityRows.entity_rows: if type == "node": entityList.append(ULTIPA.Node(values=None, uuid=data.uuid)) else: entityList.append(ULTIPA.Edge(values=None, uuid=data.uuid)) return entityList
[docs] @staticmethod def propertyType(type: str) -> ULTIPA.PropertyType: ''' Convert the type input by the user to ULTIPA.PropertyType :param type: :return: ''' if type.upper() == 'PROPERTY_STRING' or type.upper() == 'STRING': type = ULTIPA.PropertyType.PROPERTY_STRING elif type.upper() == 'PROPERTY_INT32' or type.upper() == 'INT32': type = ULTIPA.PropertyType.PROPERTY_INT32 elif type.upper() == 'PROPERTY_INT64' or type.upper() == 'INT64': type = ULTIPA.PropertyType.PROPERTY_INT64 elif type.upper() == 'PROPERTY_FLOAT' or type.upper() == 'FLOAT': type = ULTIPA.PropertyType.PROPERTY_FLOAT elif type.upper() == 'PROPERTY_DOUBLE' or type.upper() == 'DOUBLE': type = ULTIPA.PropertyType.PROPERTY_DOUBLE elif type.upper() == 'PROPERTY_UINT32' or type.upper() == 'UINT32': type = ULTIPA.PropertyType.PROPERTY_UINT32 elif type.upper() == 'PROPERTY_UINT64' or type.upper() == 'UINT64': type = ULTIPA.PropertyType.PROPERTY_UINT64 elif type.upper() == 'PROPERTY_DATETIME' or type.upper() == 'DATETIME': type = ULTIPA.PropertyType.PROPERTY_DATETIME elif type.upper() == 'PROPERTY_TIMESTAMP' or type.upper() == 'TIMESTAMP': type = ULTIPA.PropertyType.PROPERTY_TIMESTAMP elif type.upper() == 'PROPERTY_TEXT' or type.upper() == 'TEXT': type = ULTIPA.PropertyType.PROPERTY_TEXT elif type.upper() == 'PROPERTY_UUID' or type.upper() == 'UUID': type = ULTIPA.PropertyType.PROPERTY_UUID elif type.upper() == 'PROPERTY_ID' or type.upper() == 'ID': type = ULTIPA.PropertyType.PROPERTY_ID elif type.upper() == 'PROPERTY_FROM' or type.upper() == 'FROM': type = ULTIPA.PropertyType.PROPERTY_FROM elif type.upper() == 'PROPERTY_FROM_UUID' or type.upper() == 'FROM_UUID': type = ULTIPA.PropertyType.PROPERTY_FROM_UUID elif type.upper() == 'PROPERTY_TO' or type.upper() == 'TO': type = ULTIPA.PropertyType.PROPERTY_TO elif type.upper() == 'PROPERTY_TO_UUID' or type.upper() == 'TO_UUID': type = ULTIPA.PropertyType.PROPERTY_TO_UUID elif type.upper() == 'PROPERTY_IGNORE' or type.upper() == 'IGNORE': type = ULTIPA.PropertyType.PROPERTY_IGNORE return type
@staticmethod def _formatTableHeader(headers): headerList = [] headertype = {} if isinstance(headers, str): headers = json.loads(headers) if isinstance(headers, list) and isinstance(headers, list): for index, header in enumerate(headers): headdic = {} if isinstance(header, dict): type = FormatType.propertyType(header.get('type')) headdic.update({'property_name': header.get('name')}) headdic.update({'property_type': type}) headertype.update({header.get('name'): type}) headerList.append(headdic) else: headdic.update({'property_name': header.name}) headdic.update({'property_type': header.type}) if header.name in headertype: headertype.update({header.name: header.type}) headerList.append(headdic) else: raise ParameterException('Attribute names cannot be the same') else: raise ParameterException(err='The headers and rows must be list') return (headerList, headertype)
[docs] @staticmethod def pathScheams(path): nodeSchemas = {} edgeSchemas = {} for header in path.node_table.schemas: schemaName = header.schema_name propetyList = [] for index, property in enumerate(header.properties): subTypes = [Property._getStringByPropertyType(sbt) for sbt in property.sub_types] newproperty = ULTIPA.Property(property.property_name,subTypes=subTypes) newproperty.type = Property._getStringByPropertyType(property.property_type) propetyList.append(newproperty) nodeSchemas.update({schemaName: ULTIPA.Schema(schemaName,None,None,None, propetyList, None)}) for header in path.edge_table.schemas: schemaName = header.schema_name propetyList = [] for index, property in enumerate(header.properties): subTypes = [Property._getStringByPropertyType(sbt) for sbt in property.sub_types] newproperty = ULTIPA.Property(property.property_name,subTypes=subTypes) newproperty.type = Property._getStringByPropertyType(property.property_type) propetyList.append(newproperty) edgeSchemas.update({schemaName: ULTIPA.Schema(schemaName,None,None,None, propetyList, None)}) return nodeSchemas, edgeSchemas
[docs] @staticmethod def makeEntityNodeTable(schema: ULTIPA_REQUEST.Schema, entity_rows: List[ULTIPA.EntityRow], timeZoneOffset=None) -> ULTIPA.NodeEntityTable: nodetable = ULTIPA.NodeEntityTable([], []) nodetable.schemas.append({"schema_name": schema.name, "properties": []}) for prop in schema.properties: if prop.isIdType() or prop.isIgnore(): continue nodetable.schemas[0].get("properties").append( {"property_name": prop.name, "property_type": prop.type, "sub_types": prop.subTypes}) for i, row in enumerate(entity_rows): append = True values = [] FormatType.checkEntityNodeRow(i, schema, row) for proper in schema.properties: if proper.isIdType() or proper.isIgnore(): continue if proper.name in row.values: value = row.values.get(proper.name) if isinstance(proper.type,str): ptype = proper.getPropertyTypeByString(proper.type) else: ptype = proper.type checkRet = TypeCheck.checkProperty(ptype, value) if isinstance(checkRet, bool): try: _seria = _Serialize(type=ptype, value=value, name=proper.name, timeZoneOffset=timeZoneOffset, subTypes=proper.subTypes) sdata = _seria.serialize() values.append(sdata) except Exception as e: if len(e.args) > 0 and "%s" in e.args[0]: e = e.args[0] % (proper.name, value) raise ParameterException(err=f"node row [{row._getIndex()}] error: {e}") else: raise ParameterException( err=checkRet % ("node", row._getIndex(), proper.name, value)) else: raise ParameterException( err=f"node row [{row._getIndex()}] error: values doesn't contain property [{proper.name}].") if append: data = {"schema_name": schema.name, "values": values} if row.uuid: data.update({"uuid": row.uuid}) if row.id: data.update({"id": row.id}) nodetable.nodeRows.append(data) return nodetable
[docs] @staticmethod def makeEntityEdgeTable(schema: ULTIPA_REQUEST.Schema, rows: List[ULTIPA.EntityRow], timeZoneOffset=None) -> ULTIPA.EdgeEntityTable: edgetable = ULTIPA.EdgeEntityTable([], []) edgetable.schemas.append({"schema_name": schema.name, "properties": []}) for prop in schema.properties: if prop.isIdType() or prop.isIgnore(): continue edgetable.schemas[0].get("properties").append({"property_name": prop.name, "property_type": prop.type}) for i, row in enumerate(rows): append = True values = [] FormatType.checkEntityEdgeRow(i, schema, row) for proper in schema.properties: if proper.isIdType() or proper.isIgnore(): continue if proper.name in row.values: value = row.values.get(proper.name) if isinstance(proper.type, str): ptype = proper.getPropertyTypeByString(proper.type) else: ptype = proper.type checkRet = TypeCheck.checkProperty(ptype, value) if isinstance(checkRet, bool): try: _seria = _Serialize(type=ptype, value=value, name=proper.name, subTypes=proper.subTypes, timeZoneOffset=timeZoneOffset) sdata = _seria.serialize() values.append(sdata) except Exception as e: raise ParameterException(err=f"edge row [{row._getIndex()}] error:{e}.") else: raise ParameterException( err=checkRet % ("edge", row._getIndex(), proper.name, value)) else: raise ParameterException( err=f"edge row [{row._getIndex()}] error: values doesn't contain property [{proper.name}].") if append: data = {"schema_name": schema.name, "values": values} if row.from_uuid: data.update({"from_uuid": row.from_uuid}) if row.to_uuid: data.update({"to_uuid": row.to_uuid}) if row.from_id: data.update({"from_id": row.from_id}) if row.to_id: data.update({"to_id": row.to_id}) if row.uuid: data.update({"uuid": row.uuid}) edgetable.edgeRows.append(data) return edgetable
[docs] @staticmethod def nodeAliases(_nodes, timeZone, timeZoneOffset) -> List[ULTIPA.NodeAlias]: nodeAliasesData = [] if _nodes: for node in _nodes: nodesData = FormatType.nodeTable(node, timeZone, timeZoneOffset) nodeAliasData = ULTIPA.NodeAlias(alias=node.alias, nodes=nodesData) nodeAliasesData.append(nodeAliasData) return nodeAliasesData
[docs] @staticmethod def attrNodeEntityTable(data) -> List[ULTIPA.Node]: return FormatType.attrEntity(data, "node")
[docs] @staticmethod def attrEdgeEntityTable(data) -> List[ULTIPA.Edge]: return FormatType.attrEntity(data, "edge")
[docs] @staticmethod def edgeAliases(_edges, timeZone, timeZoneOffset) -> List[ULTIPA.EdgeAlias]: edgeAliasesData = [] if _edges: for edge in _edges: edgesData = FormatType.edgeTable(edge, timeZone, timeZoneOffset) edgeAliasData = ULTIPA.EdgeAlias(alias=edge.alias, edges=edgesData) edgeAliasesData.append(edgeAliasData) return edgeAliasesData
[docs] @staticmethod def values(_values) -> 'dict': value_dict = {} if _values: for value in _values: value_dict.update({value.key: value.value}) return value_dict
[docs] @classmethod def parseAttrListData(cls, datas) -> List[ULTIPA.PaserAttrListData]: PaserAttrRetList = [] for data in datas: attrListRet = [] pathListRet = [] nodetRet = None edgeRet = None attrListData = AttrListData() attrListData.ParseFromString(data) if attrListData.type == ULTIPA.ResultType.RESULT_TYPE_ATTR: for attr in attrListData.attrs: attrListRet.append(cls.parseAttr(attr, None)) if attrListData.type == ULTIPA.ResultType.RESULT_TYPE_PATH: pathListRet.extend(FormatType.attrPath(attrListData.paths, None, None)) if attrListData.type == ULTIPA.ResultType.RESULT_TYPE_NODE: nodetRet = FormatType.attrNodeEntityTable(attrListData.nodes) if attrListData.type == ULTIPA.ResultType.RESULT_TYPE_EDGE: edgeRet = FormatType.attrEdgeEntityTable(attrListData.edges) PaserAttrRetList.append( ULTIPA.PaserAttrListData(attrListData.type, nodes=nodetRet, edges=edgeRet, paths=pathListRet, attrs=attrListRet)) return PaserAttrRetList
[docs] @classmethod def parseAttr(cls, attr, aliasName) -> ULTIPA.Attr: valuesList = [] attr_type = attr.value_type if attr_type == ULTIPA.PropertyType.PROPERTY_LIST: valuesList = FormatType.parseAttrListData(attr.values) return ULTIPA.Attr(alias=aliasName, values=valuesList, type=Property._getStringByPropertyType(attr_type)) for att in attr.values: _seria = _Serialize(type=attr_type, value=att) sdata = _seria.unserialize() valuesList.append(sdata) return ULTIPA.Attr(alias=aliasName, values=valuesList, type=Property._getStringByPropertyType(attr_type))
[docs] @classmethod def parseAttrAlias(cls, attrAlias) -> ULTIPA.Attr: return cls.parseAttr(attrAlias.attr, attrAlias.alias)
[docs] @staticmethod def attrAlias1(_attrs) -> List[ULTIPA.Attr]: attr_list = [] if _attrs: for attr in _attrs: attr_list.append(FormatType.parseAttrAlias(attr)) return attr_list
[docs] @staticmethod def attr(attr, timeZone, timeZoneOffset) -> ULTIPA.UltipaAttr: type = attr.value_type result = ULTIPA.UltipaAttr(type, None, has_attr_data=False, has_ultipa_data=False, type_desc=Property._getStringByPropertyType(type)) result.values = [] for value in attr.values: if type == ULTIPA.PropertyType.PROPERTY_NULL: result.has_attr_data = True result.values.append(None) continue if type == ULTIPA.PropertyType.PROPERTY_LIST: attrListData = AttrListData() attrListData.ParseFromString(value) result_type: any = attrListData.type if attrListData.is_null: result.values.append(None) continue if result_type == ULTIPA.ResultType.RESULT_TYPE_ATTR: attrs = [] for attr in attrListData.attrs: att = FormatType.attr(attr, timeZone, timeZoneOffset) if not att.values: attrs.append(att.values) else: attrs.extend(att.values) if len(attrs) != 0: result.has_attr_data = True result.values.append(attrs) if result_type == ULTIPA.ResultType.RESULT_TYPE_PATH: result.has_ultipa_data = True result.values.append(ULTIPA.PaserAttrListData(attrListData.type, paths=FormatType.attrPath(attrListData.paths, timeZone, timeZoneOffset))) if result_type == ULTIPA.ResultType.RESULT_TYPE_NODE: result.has_ultipa_data = True result.values.append(ULTIPA.PaserAttrListData(attrListData.type, nodes=FormatType.attrNodeEntityTable( attrListData.nodes))) if result_type == ULTIPA.ResultType.RESULT_TYPE_EDGE: result.has_ultipa_data = True result.values.append(ULTIPA.PaserAttrListData(attrListData.type, edges=FormatType.attrEdgeEntityTable( attrListData.edges))) else: _seria = _Serialize(type=type, value=value, timeZone=timeZone, timeZoneOffset=timeZoneOffset) sdata = _seria.unserialize() result.has_attr_data = True result.values.append(sdata) return result
[docs] @staticmethod def attrAlias(_attrs, timeZone, timeZoneOffset) -> List[ULTIPA.AttrNewAlias]: attr_list = [] if _attrs: for attr in _attrs: attr_data = FormatType.attr(attr.attr, timeZone, timeZoneOffset) attr_list.append(ULTIPA.AttrNewAlias(alias=attr.alias, attr=attr_data)) return attr_list
[docs] @staticmethod def arrays(_arrays) -> List[ULTIPA.ArrayAlias]: arrays_list = [] if _arrays: for attr in _arrays: li = [] attr_type = attr.property_type for att in attr.elements: value_li = [] for el in att.values: _seria = _Serialize(type=attr_type, value=el) sdata = _seria.unserialize() value_li.append(sdata) li.append(value_li) ret = ULTIPA.ArrayAlias(alias=attr.alias, elements=li) arrays_list.append(ret) return arrays_list
[docs] @staticmethod def resultalias(_arrays) -> List[ULTIPA.ResultAlias]: value_dict = [] if _arrays: for value in _arrays: value_dict.append(ULTIPA.ResultAlias(value.alias, ResultType.getTypeStr(value.result_type))) return value_dict
[docs] @staticmethod def explainPlan(_explainPlan) -> List[ULTIPA.ExplainPlan]: explainPlanRet = [] if _explainPlan: for value in _explainPlan.plan_nodes: explainPlanRet.append( ULTIPA.ExplainPlan(value.alias, value.children_num, value.uql, value.infos)) return explainPlanRet
[docs] @staticmethod def export_edges(_edges, timeZone, timeZoneOffset) -> List: edgeTableData = FormatType.edgeTable(_edges, timeZone, timeZoneOffset) return edgeTableData
[docs] @staticmethod def export_nodes(_nodes, timeZone, timeZoneOffset) -> List: nodeTableData = FormatType.nodeTable(_nodes, timeZone, timeZoneOffset) return nodeTableData
[docs] @staticmethod def pathAlias(_paths, timeZone, timeZoneOffset) -> [ULTIPA.PathAlias]: pathData = [] if _paths: for path in _paths: pathAlia = ULTIPA.PathAlias(path.alias) for npath in path.paths: nodeAliasesData = FormatType.nodeTable(npath, timeZone, timeZoneOffset) edgeAliasesData = FormatType.edgeTable(npath, timeZone, timeZoneOffset) nodeSchema, edgeSchema = FormatType.pathScheams(npath) newPath = ULTIPA.Path(nodes=nodeAliasesData, edges=edgeAliasesData, nodeSchemas=nodeSchema, edgeSchemas=edgeSchema) pathAlia.paths.append(newPath) pathData.append(pathAlia) return pathData
[docs] @staticmethod def attrPath(_paths, timeZone, timeZoneOffset) -> [ULTIPA.Path]: pathData = [] if _paths: for path in _paths: nodeAliasesData = FormatType.nodeTable(path, timeZone, timeZoneOffset) edgeAliasesData = FormatType.edgeTable(path, timeZone, timeZoneOffset) nodeSchema, edgeSchema = FormatType.pathScheams(path) newPath = ULTIPA.Path(nodes=nodeAliasesData, edges=edgeAliasesData, nodeSchemas=nodeSchema, edgeSchemas=edgeSchema) pathData.append(newPath) return pathData
[docs] @staticmethod def Response(_res, host: str = None) -> ULTIPA_RESPONSE.Response: status = FormatType.status(_res.status, host) res = ULTIPA_RESPONSE.Response(status) return res
[docs] @staticmethod def mergeUqlResponse(mergeRes: ULTIPA_RESPONSE.Response, res: ULTIPA_RESPONSE.Response): if not mergeRes.data: mergeRes = copy.deepcopy(res) return mergeRes if res.data.paths: mergeRes.data.paths = DataMerge.concat(mergeRes.data.paths, res.data.paths) if res.data.nodes: mergeRes.data.nodes = DataMerge.concat(mergeRes.data.nodes, res.data.nodes, 'nodes') if res.data.edges: mergeRes.data.edges = DataMerge.concat(mergeRes.data.edges, res.data.edges, 'edges') if res.data.attrs: mergeRes.data.attrs = DataMerge.concat(mergeRes.data.attrs, res.data.attrs, 'values') if res.data.tables: mergeRes.data.tables = DataMerge.concat(mergeRes.data.tables, res.data.tables, "table_name", ["headers"]) if res.data.explainPlan: mergeRes.data.explainPlan = DataMerge.concat(mergeRes.data.explainPlan, res.data.explainPlan) if res.data.resultAlias: mergeRes.data.resultAlias = DataMerge.concat(mergeRes.data.resultAlias, res.data.resultAlias) if res.statistics: mergeRes.statistics = DataMerge.concat(mergeRes.statistics, res.statistics) return mergeRes
[docs] @staticmethod def response(uql_response: ULTIPA_RESPONSE.Response, uqlReply, timeZone, timeZoneOffset) -> ULTIPA_RESPONSE.Response: attrs = FormatType.attrAlias(uqlReply.attrs, timeZone, timeZoneOffset) attrs_attrs = [] attrs_map = {} def addAttributes(alias: str, type: ULTIPA.ResultType, values: any, dataMap: HasDataMap) -> None: if attrs_map.get(alias) is None: if dataMap.has_attr_data: type = ULTIPA.ResultType.RESULT_TYPE_ATTR attralias = ULTIPA.Attr(alias=alias, type=type, type_desc=ULTIPA.ResultType.getTypeStr(type), values=[]) attrs_map[alias] = attralias attrs_attrs.append(attralias) if type == ULTIPA.ResultType.RESULT_TYPE_ATTR and dataMap.only_attr_list: attrs_map[alias].values = values else: attrs_map[alias].values.append(values) for attrAlias in attrs: alias = attrAlias.alias has_attr_data = attrAlias.attr.has_attr_data has_ultipa_data = attrAlias.attr.has_ultipa_data only_attr_list = has_attr_data and not has_ultipa_data dataMap = HasDataMap(has_ultipa_data=has_ultipa_data, has_attr_data=has_attr_data, only_attr_list=only_attr_list) if attrAlias.attr.type == ULTIPA.PropertyType.PROPERTY_LIST: if only_attr_list: addAttributes(alias, ULTIPA.ResultType.RESULT_TYPE_ATTR, attrAlias.attr.values, dataMap) continue for row in attrAlias.attr.values: if not row or not row.type: addAttributes(alias, ULTIPA.ResultType.RESULT_TYPE_ATTR, row, dataMap) continue if not isinstance(row, ULTIPA.PaserAttrListData): addAttributes(alias, ULTIPA.ResultType.RESULT_TYPE_ATTR, row, dataMap) continue if row.type == ULTIPA.ResultType.RESULT_TYPE_NODE: addAttributes(alias, row.type, row.nodes, dataMap) elif row.type == ULTIPA.ResultType.RESULT_TYPE_EDGE: addAttributes(alias, row.type, row.edges, dataMap) elif row.type == ULTIPA.ResultType.RESULT_TYPE_PATH: addAttributes(alias, row.type, row.paths, dataMap) else: addAttributes(alias, ULTIPA.ResultType.RESULT_TYPE_ATTR, attrAlias.attr.values, dataMap) status = FormatType.status(uqlReply.status) tables = FormatType.tables(uqlReply.tables, timeZone, timeZoneOffset) paths = FormatType.pathAlias(uqlReply.paths, timeZone, timeZoneOffset) nodes = FormatType.nodeAliases(uqlReply.nodes, timeZone, timeZoneOffset) # ForamtType.nodeAliases edges = FormatType.edgeAliases(uqlReply.edges, timeZone, timeZoneOffset) attrs = attrs_attrs statistics = FormatType.statistics(uqlReply.statistics, timeZone, timeZoneOffset) resultAlias = FormatType.resultalias(uqlReply.alias) explainPlan = FormatType.explainPlan(uqlReply.explain_plan) baseReply = ULTIPA.BaseUqlReply(paths=paths, nodes=nodes, edges=edges, tables=tables, attrs=attrs, resultAlias=resultAlias, explainPlan=explainPlan) uql_response.status = status uql_response.statistics = statistics uql_response.data = baseReply uql_response.aliases = resultAlias return uql_response
[docs] @staticmethod def uqlResponse(_res, timeZone, timeZoneOffset) -> ULTIPA_RESPONSE.UltipaResponse: uql_response = ULTIPA_RESPONSE.Response() ultipa_response = ULTIPA_RESPONSE.UltipaResponse() for uqlReply in _res: status = FormatType.status(uqlReply.status) uql_response = FormatType.response(uql_response, uqlReply, timeZone, timeZoneOffset) ret = ULTIPA.UqlReply(dataBase=uql_response.data) if status.code != ULTIPA.Code.SUCCESS: ultipa_response.status = uql_response.status ultipa_response.req = uql_response.req return ultipa_response ultipa_response.items = ret._aliasMap ultipa_response.status = uql_response.status ultipa_response.req = uql_response.req ultipa_response.statistics = uql_response.statistics yield ultipa_response
[docs] @staticmethod def uqlMergeResponse(_res, timeZone, timeZoneOffset) -> ULTIPA_RESPONSE.UltipaResponse: uql_response = ULTIPA_RESPONSE.Response() mergeRes = ULTIPA_RESPONSE.Response() ultipa_response = ULTIPA_RESPONSE.UltipaResponse() for uqlReply in _res: status = FormatType.status(uqlReply.status) uql_response = FormatType.response(uql_response, uqlReply, timeZone, timeZoneOffset) if status.code != ULTIPA.Code.SUCCESS: ultipa_response.status = uql_response.status ultipa_response.req = uql_response.req return ultipa_response mergeRes = FormatType.mergeUqlResponse(mergeRes, uql_response) ret = ULTIPA.UqlReply(dataBase=mergeRes.data) ultipa_response.items = ret._aliasMap ultipa_response.explainPlan = ret.explainPlan ultipa_response.status = uql_response.status ultipa_response.req = uql_response.req ultipa_response.aliases = uql_response.aliases ultipa_response.statistics = uql_response.statistics return ultipa_response
[docs] @staticmethod def downloadResponse(_res) -> ULTIPA_RESPONSE.Response: for uqlReply in _res: status = FormatType.status(uqlReply.status) data = FormatType.status(uqlReply.chunk) total_time = uqlReply.total_time_cost engine_time = uqlReply.total_time_cost res = ULTIPA_RESPONSE.Response(status, total_time, engine_time, data) return res
[docs] @staticmethod def exportResponse(_res, timeZone, timeZoneOffset) -> ULTIPA_RESPONSE.Response: nodedata = [] edgedata = [] res = ULTIPA_RESPONSE.Response() for uqlReply in _res: res.status = FormatType.status(uqlReply.status) if uqlReply.node_table: nodedata = FormatType.export_nodes(uqlReply, timeZone, timeZoneOffset) if uqlReply.edge_table: edgedata = FormatType.export_edges(uqlReply, timeZone, timeZoneOffset) if nodedata: uql = ULTIPA.ExportReply(data=nodedata) res.data = uql.data if edgedata: uql = ULTIPA.ExportReply(data=edgedata) res.data = uql.data break return res
[docs] @staticmethod def graphPrivileges(graphPrivileges: '[object]'): new_graphPrivileges = [] if graphPrivileges: for gp in graphPrivileges: graphDcit = {} graphDcit.update({'name': list(gp.keys())[0]}) graphDcit.update({'values': list(gp.values())[0]}) new_graphPrivileges.append(graphDcit) return new_graphPrivileges