概述
嬴图Spark Connector通过嬴图Java SDK连接嬴图和Apache Spark,用于在Spark环境中对嬴图数据库进行读取和写入。
嬴图Spark Connector基于最新的Spark DataSource API,支持与Spark交互的各种语言,包括Scala、Python、Java和R。本手册以Scala为例进行说明,使用其他语言只需进行少量的语法调整。
安装
版本要求
嬴图Spark Connector支持以下版本的嬴图和Spark:
- 嬴图v4.x(v4.3及以上),单实例或集群均可
- 使用Scala 2.12的Spark 2.4.8
导入依赖
在pom.xml文件中导入嬴图Spark Connector依赖:
<dependencies>
<dependency>
<groupId>com.ultipa</groupId>
<artifactId>ultipa-spark-connector</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
读取
你可以根据点schema、边schema或一个嬴图GQL查询语句从嬴图数据库读取数据为一个Spark DataFrame。
Spark不支持嬴图所有的属性数据类型,详情请参考数据类型转换。
通过点Schema读
读取指定schema中所有点的_id
和自定义属性数据。
示例:从Test图集读取所有Person点数据
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("com.ultipa.spark.DataSource")
.option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.option("auth.username","root")
.option("auth.password","root")
.option("graph","Test")
.option("nodes","Person")
.load()
df.show()
结果:
_id | name | gender |
---|---|---|
U001 | Alice | female |
U002 | Bruce | male |
U003 | Joe | male |
通过边Schema读
读取指定schema中所有边的_from
、_to
和自定义属性数据。
示例:从Test图集读取所有Follows边数据
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("com.ultipa.spark.DataSource")
.option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.option("auth.username","root")
.option("auth.password","root")
.option("graph","Test")
.option("edges","Follows")
.load()
df.show()
结果:
_from | _to | since |
level |
---|---|---|---|
U001 | U002 | 2019-12-15 12:10:09 | 1 |
U003 | U001 | 2021-1-20 09:15:02 | 2 |
通过嬴图GQL读
读取一个嬴图GQL语句返回的数据。该嬴图GQL语句必须包含RETURN子句,且返回的数据类型为ATTR或TABLE。不支持其他的返回值类型(如NODE、EDGE和PATH等)。什么是返回值类型
示例:从Test图集读取一个嬴图GQL语句返回的数据
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("com.ultipa.spark.DataSource")
.option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.option("auth.username","root")
.option("auth.password","root")
.option("graph","Test")
.option("query","find().nodes() as n return n.name, n.gender")
.load()
df.show()
结果:
n.name | n.gender |
---|---|
Alice | female |
Bruce | male |
Joe | male |
写入
你可以将一个Spark DataFrame作为点或边数据写入嬴图数据库的一个schema中。DataFrame中的每列映射为点或边的一个属性,列名就是属性名(点的_id
以及边的_from
和_to
属性除外),不存在的属性会在写入过程中自动创建。
每个属性的数据类型由DataFrame每列的数据类型决定,详情请参考数据类型转换。
写入点Schema
示例:将一个DataFrame写为Test图集中的Person点
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val data = Seq(("Alice", "Teacher", 25, 1.11), ("Bob", "Worker", 30, 2.22), ("Charlie", "Officer", 35, 3.33))
val df = spark.createDataFrame(data).toDF("name", "job", "age", "income")
df.show()
df.write.format("com.ultipa.spark.DataSource")
.option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.option("auth.username","root")
.option("auth.password","root")
.option("graph","Test")
.option("nodes", "Person")
.option("nodes.id", "name")
.save()
写入边Schema
示例:将一个DataFrame写为Test图集中的RelatesTo边
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val data = Seq(("Alice", "Bob", "couple"), ("Bob", "Charlie", "couple"), ("Charlie", "Alice", "friend"))
val df = spark.createDataFrame(data).toDF("from", "to", "type")
df.show()
df.write.format("com.ultipa.spark.DataSource")
.option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.option("auth.username","root")
.option("auth.password","root")
.option("graph","Test")
.option("edges", "RelatesTo")
.option("edges.from", "from")
.option("edges.to", "to")
.save()
配置
Options
在Spark API中,DataFrameReader
和DataFrameWriter
类都提供option()
方法,用于指定读取或写入操作的各项参数。
以下是嬴图Spark Connector支持的所有options:
通用
Option键 |
默认值 |
描述 | 可选 |
---|---|---|---|
hosts | 嬴图服务器或集群(英文逗号隔开)的IP地址或URL地址(不包括 "https://" 或 "http://") | 否 | |
auth.username | 用户名 | 否 | |
auth.password | 密码 | 否 | |
graph | default | 要连接的图集名 | 是 |
connection.timeout | 15 | 请求超时时间(秒) | 是 |
connection.connect.timeout | 2000 | 连接超时时间(毫秒),默认每个节点尝试三次 | 是 |
connection.heartbeat | 10000 | 所有实例的心跳时间(毫秒),0表示关闭心跳 | 是 |
connection.max.recv.size | 41943040 | 接收数据的最大字节数 | 是 |
读取
Option键 |
默认值 |
描述 | 可选 |
---|---|---|---|
nodes | 点schema名称 | 是 | |
edges | 边schema名称 | 是 | |
query | 读取数据的嬴图GQL查询语句 | 是 |
写入
Option键 |
默认值 |
描述 | 可选 |
---|---|---|---|
nodes | 点schema名称;如果指定的schema不存在,则在写入过程中自动创建 | 是 | |
nodes.id | 作为点的_id 属性的DataFrame中的一个列名 |
是 | |
edges | 边schema名称;如果指定的schema不存在,则在写入过程中自动创建 | 是 | |
edges.from | 作为边的_from 属性的DataFrame中的一个列名 |
是 | |
edges.to | 作为边的_to 属性的DataFrame中的一个列名 |
是 |
全局配置
你可以在每次连接时设置options,或在Spark会话中进行全局配置来避免重复设置。为此,可以通过在config()
方法实现,但注意此时需要在option键名称前加上ultipa.
。
示例:对hosts
、auth.username
、auth.password
和connection.timeout
这几个option进行全局配置
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder()
.config("ultipa.hosts", "192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.config("ultipa.auth.username","root")
.config("ultipa.auth.password","root")
.config("ultipa.graph", "Test")
.config("ultipa.connection.timeout", 600)
.getOrCreate()
val dfPerson = spark.read.format("com.ultipa.spark.DataSource")
.option("nodes", "Person")
.load()
数据类型转换
嬴图属性类型 | Spark数据类型 |
---|---|
_id , _from , _to |
StringType |
_uuid , _from_uuid , _to_uuid |
LongType |
int32 |
IntegerType |
uint32 |
LongType |
int64 |
LongType |
uint64 |
StringType |
float |
FloatType |
double |
DoubleType |
decimal |
|
string |
StringType |
text |
|
datetime |
TimestampType |
timestamp |
TimestampType |
point |
|
blob |
BinaryType |
list |
|
set |
|
ignore |
NullType |
UNSET |
NullType |
_ | StringType |