Spark与TDengine连接总结
TDengine是一款高性能的时序数据库,最近在研究如何在Spark中读取和写入数据,这里记录一下最近的成果。
版本信息:
- Spark版本:2.4.0,yarn集群模式
- Python版本:3.7.9
- Scala版本:2.11.12
- TDengine版本:2.6,商业版
- TAOS-JDBC版本:2.0.42
1. 官方JDBC使用
TDengine官方提供了JDBC,Spark读取和写入均可以直接使用
1.1 依赖问题
- 需要引用taos-jdbcdriver,版本上强烈推荐2.0.42,其他版本会有各种问题(针对2.x系列,3.x系列的没用过);
1 | <dependency> |
- taos-jdbcdriver的依赖与spark集群可能存在冲突(版本不同),需要手动指定依赖,方法为:
- 下载正确的依赖,我用的是guava-30.1.1-jre.jar和failureaccess-1.0.1.jar
- 将jar上传hadoop;
- 在提交spark任务的时候,手动制定依赖:
1 | --conf spark.driver.extraClassPath=guava-30.1.1-jre.jar:failureaccess-1.0.1.jar \ |
- 对于pyspark程序,最好引用taos-jdbcdriver-2.0.42-dist.jar,补足依赖
1 | --conf spark.driver.extraClassPath=guava-30.1.1-jre.jar:failureaccess-1.0.1.jar:taos-jdbcdriver-2.0.42-dist.jar |
1.2 读取
Spark通过TDBC读取TDengine数据和其他数据库并没有什么本质区别,可以通过以下方法来读取:
- 读取整张表
1 | info = spark.read\ |
- 读取部分内容
1 | info = spark.read\ |
如果读取的数量较多,可能会遇到timeout错误,可以设置httpSocketTimeout,单位是ms,默认值为5000
1.3 存储
可以使用下面的方法向TDengine保存数据
1 | toHbaseSave |
2. 使用自定义数据源写入数据
对于spark来说,直接使用官方的jdbc进行连接只能实现向普通表中写入数据,无法利用TDengine超级表的特性,为了能向超级表中写入数据,需要自定义数据源。
由于只需要写入特性,因此下面的内容中只实现了写入的逻辑,没有实现读取的逻辑,使用scala编写。
继承DataSourceV2和WriteSupport 并重写createWriter方法,创建自定义的数据源;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19class TDSourceV2 extends DataSourceV2 with WriteSupport with Serializable {
override def createWriter(
jobId: String,
structType: StructType,
saveMode: SaveMode,
dataSourceOptions: DataSourceOptions): Optional[DataSourceWriter] = {
Optional.of(new TDSourceWriter(
// url
dataSourceOptions.get("url").get(),
// 用户名
dataSourceOptions.get("user").get(),
// 密码
dataSourceOptions.get("password").get(),
// 数据库名称
dataSourceOptions.get("db").get(),
// 超级表名称
dataSourceOptions.get("stable").get()))
}继承 DataSourceWriter 重写 createWriterFactory 方法并返回自定义的 DataWriterFactory,重写 commit 方法,用来提交整个事务, 重写 abort 方法,用来做事务回滚;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15class TDSourceWriter(
url: String,
user: String,
password: String,
db: String,
stable: String) extends DataSourceWriter with Serializable{
override def createWriterFactory(): DataWriterFactory[InternalRow] = {
new TDWriterFactory(url, user, password, db, stable)
}
// 2.6版本的TDengine不支持事务
override def commit(writerCommitMessages: Array[WriterCommitMessage]): Unit = Unit
// 2.6版本的TDengine不支持事务
override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = Unit
}继承 DataWriterFactory, 重写 createDataWriter方法返回自定义的 DataWriter;
1
2
3
4
5
6
7
8
9
10
11
12
13class TDWriterFactory(
url: String,
user: String,
password: String,
db: String,
stable: String) extends DataWriterFactory[InternalRow] with Serializable {
override def createDataWriter(
partitionId: Int,
taskId: Long,
epochId: Long): DataWriter[InternalRow] = {
new TDDataWriter(url, user, password, db, stable)
}
}继承 DataWriter 重写 write 方法实现具体的写入数据库逻辑,重写 commit 方法用来提交事务,重写 abort 方法用来做事务回滚 ;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57class TDDataWriter(
url: String,
user: String,
password: String,
db: String,
stable: String) extends DataWriter[InternalRow] with Serializable {
private val logger = LoggerFactory.getLogger(this.getClass)
private var conn: Connection = null
private var stmt: Statement = null
override def write(record: InternalRow): Unit = {
// 在这里编写将数据插入数据库的逻辑
Class.forName("com.taosdata.jdbc.rs.RestfulDriver")
val jdbcUrl = s"jdbc:TAOS-RS://$url/$db?user=$user&password=$password"
// 可以通过record.getxxx获取对应列的内容
// 如table_name = record.getString(0)
val variable1 = record.getString(0)
val variable2 = record.getInt(1)
val variable3 = record.getFloat(2)
// 构建插入数据库的sql语句
// 请注意,只有原生连接方式才支持参数绑定的插入方式,
// 由于要在spark集群上跑,这里用的是Rest的连接方式,
// 所以只能手动构建插入数据库的sql语句,
// 关于原生连接、Rest连接和参数绑定插入,请见官方文档
val sql = "insert to ..."
logger.info(sql)
conn = DriverManager.getConnection(jdbcUrl)
stmt = conn.createStatement()
stmt.execute(s"use $db")
try{
stmt.execute(sql)
conn.commit()
}catch {
case e: Exception => e.printStackTrace()
}finally {
conn.close()
}
}
// Tdengine2.6版本不支持事务,因此此处并无实际需要做的事情
// 创建WriterCommitMessage类,绝对不能传null,网上有些代码是错误的
object WriteSucceeded extends WriterCommitMessage
override def commit(): WriterCommitMessage = WriteSucceeded
// Tdengine2.6版本不支持事务,因此此处并无实际需要做的事情
override def abort(): Unit = Unit
}调用,在程序中指定自定义的datasource并传入参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15toTdSave
.write
.format("org.example.TDSourceV2")
.mode(SaveMode.Append)
// url
.option("url", url)
// 数据库名
.option("db", db)
// 用户名
.option("user", user)
// 超级表名
.option("stable", stable)
// 密码
.option("password", password)
.save()
参考文献: