摘要:开启vsftpd服务,注意修改,默认路径(gbload_server_9.5.25版本支持sftp协议)
原文链接:datax+gbload+GBase入库适配操作手顺|GBASE社区|天津南大通用数据技术股份有限公司|GBASE-致力于成为用户最信赖的数据库产品供应商
更多精彩内容尽在GBASE社区|天津南大通用数据技术股份有限公司|GBASE-致力于成为用户最信赖的数据库产品供应商
一、环境准备
1、开启vsftpd服务,注意修改,默认路径(gbload_server_9.5.25版本支持sftp协议)
userlist_enable=YES -- 设定userlist 可用,一般是deny。这个不强求。如果是,将对应用户从列表注销(ftpuser、user_list)
local_root=/opt/gbload -- 设定一个默认目录。对应下面 gbload_ftp_server_base_path 这个路径。
chroot_local_user=YES -- 限制在一个目录。
allow_writeable_chroot=YES -- 这个目录可写
2、安装gccli客户端
注意修改环境变量,参考配置文件
3、dispcli (区分python2 和3 需要申请特定注意。要python某个版本编译的)
放入目录,比如 /opt/dispcli (这里里面才是dispcli文件)
4、JAVA环境变量 (注意将sgloader 和jdbc 也配置进去。不一定有用,配置了肯定可以)
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-11.b12.el7.x86_64
export PATH=.:$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:/opt/gbloader_test/ gbase-connector-java-8.3.81.53-build55.4.1-bin.jar:/opt/gbloader_test/sgLoader/sgloader-0.0.1.jar
二、gbloader部署
1、解压后有2个目录,api_File 、server_file ,比如放在/opt/gbload_server
2、创建目录用于放加载临时文件
mkdir -p /opt/ gbload_server /data/main
mkdir -p /opt/ gbload_server /data/copy
mkdir -p /opt/ gbload_server /data/kvstore
3、修改gbload_server的配置文件,在/opt/gbload_server/server_file/下面,名字叫gbload_server.conf
修改的几个内容如下:
gbload_config_server_host=10.10.3.233 #就是部署本机的IP地址
gbload_config_extern_master_server=10.10.3.233 #就是部署本机的IP地址,以后如果加载机有扩展的情况下,这里写的是master 的IP地址,只有一台加载机则就是本机的IP地址
gbload_data_key_retention_time=2 #表加载临时信息的保存时间,单位位秒,设置2就行;
gbload_ftp_server_host_name=10.10.3.233 #就是部署本机的IP地址
gbload_ftp_server_user_passwd=gbase:gbase #连接本机ftp服务的用户名和密码,格式为user:passwd
gbload_ftp_server_base_path=/opt/gbloader_test/server_file/ #本机ftp服务的主目录,用户由gbload_ftp_server_user_passwd指定,加载服务必须对此目录有读写权限
gbload_dispatch_server_ip_port=10.10.3.235:8080 #dispatch服务的IP和端口
gbload_dispatch_client_path=/opt/gbloader_test/dispatch_server/ #dispcli所在的路径
gbload_gnode_gbloader_path=/opt/gbloader_test/server_file/ #gbloader所在的路径
gbload_database_server_host=10.10.3.109 #数据库节点的IP地址;需要制定本地gcluster/gnode使用的IP地址(GBase MPP Cluster的一个管理节点的IP)
gbload_file_count_per_controy_file=100000 #每个控制文件处理的数据文件个数
4、启动加载服务,在/opt/gbload_server/server_file下执行nohup sh ./ gbload_server_monit.sh &即可,并通过ps –ef|grep gbload_server看进程是否启动
5、gbload加载测试样例
表结构Create database testdb;
Use testdb;
CREATE TABLE "lineorder" (
"lo_orderkey" bigint(20) DEFAULT NULL,
"lo_linenumber" int(11) DEFAULT NULL,
"lo_custkey" int(11) DEFAULT NULL,
"lo_partkey" int(11) DEFAULT NULL,
"lo_suppkey" int(11) DEFAULT NULL,
"lo_orderdate" int(11) DEFAULT NULL,
"lo_orderpriority" varchar(15) DEFAULT NULL,
"lo_shippriority" varchar(1) DEFAULT NULL,
"lo_quantity" int(11) DEFAULT NULL,
"lo_extendedprice" int(11) DEFAULT NULL,
"lo_ordtotalprice" int(11) DEFAULT NULL,
"lo_discount" int(11) DEFAULT NULL,
"lo_revenue" int(11) DEFAULT NULL,
"lo_supplycost" int(11) DEFAULT NULL,
"lo_tax" int(11) DEFAULT NULL,
"lo_commitdate" int(11) DEFAULT NULL,
"lo_shipmode" varchar(10) DEFAULT NULL
) COMPRESS(5, 5) ENGINE=EXPRESS DISTRIBUTED BY('lo_orderkey');
Java样例代码以下代码用的时候,修改红色加粗的IP地址就行,就是运行gbload_server的这台机器的IP地址。
url = "jdbc:gbload://" + host + ":" + port + "/" + db + "?failoverEnable=true&hostList=" + "10.10.3.233" + "&basePath=" + sgLoader + "&sgloaderPath=" + sgLoader + "&interval=120&characterEncoding=gbk&UseOneServer=false"; // URL
import java.io.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
public class Lineorder implements runnable{
String host; // 加载服务IP地址
String port = "8585"; // 端口
String port2 = "5258";
String db; // 数据库名
String table; // 要加载的表名
int psize; // 每次提交的行数
int thdCount; //线程数
static int sleepTime = 0; //每次提交sleep的毫秒数
String sgLoader;
String fileName ;
public Lineorder(String host, String db, String table, int psize, String sgLoader, String fileName){
this.host = host;
this.db = db;
this.table = table;
this.psize = psize;
this.sgLoader = sgLoader;
this.fileName = fileName;
}
@Override
public void run {
// TODO Auto-generated method stub
String url = "jdbc:gbload://" + host + ":" + port + "/" + db + "?failoverEnable=true&hostList=" + "10.10.3.233;10.10.3.234" + "&basePath=" + sgLoader + "&sgloaderPath=" + sgLoader + "&interval=120&characterEncoding=gbk&UseOneServer=false"; // URL
try {
Class.forName("cn.gbase.jdbc.LoadDriver").newInstance;
} catch (Exception e) {
e.printStackTrace;
return;
}
Connection conn = null;
PreparedStatement pstmt = null;
try {
System.out.println(url);
conn = DriverManager.getConnection(url, "gbase", "gbase20110531");
System.out.println("getConnection done");
} catch (SQLException e1) {
e1.printStackTrace;
return;
}
if(conn == null){
System.err.println("conn is null");
return;
}
String sql = "insert into " + table + " values
//String sql = "insert into lineorder (lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode) values
File file = new File(fileName);
BufferedReader reader = null;
try{
reader = new BufferedReader(new fileReader(file));
String tempString = null;
String lines = null;
pstmt = conn.prepareStatement(sql);
String threadName = Thread.currentThread.getName;
int line = 0;
while ((tempString = reader.readLine) != null) {
lines = tempString.split("\\|");
pstmt.setString(1, lines[0]);
pstmt.setString(2, lines[1]);
pstmt.setString(3, lines[2]);
pstmt.setString(4, lines[3]);
pstmt.setString(5, lines[4]);
pstmt.setString(6, lines[5]);
pstmt.setString(7, lines[6]);
pstmt.setString(8, lines[7]);
pstmt.setString(9, lines[8]);
pstmt.setString(10, lines[9]);
pstmt.setString(11, lines[10]);
pstmt.setString(12, lines[11]);
pstmt.setString(13, lines[12]);
pstmt.setString(14, lines[13]);
pstmt.setString(15, lines[14]);
pstmt.setString(16, lines[15]);
pstmt.setString(17, lines[16]);
pstmt.addBatch;
line++;
if(line == psize){
line = 0;
pstmt.executeBatch;
conn.commit;
pstmt.clearBatch;
}
}
conn.commit;
pstmt.clearBatch;
reader.close;
System.out.println(threadName+": ==================>LOAD END! " );
}catch(Exception e){
e.printStackTrace;
}finally{
if (reader != null) {
try {
reader.close;
} catch (IOException e) {
e.printStackTrace;
}
}
try {
if (pstmt != null)
pstmt.close;
} catch (SQLException e) {
e.printStackTrace;
}
try {
if(conn != null)
conn.close;
e.printStackTrace;
}
}
}
public static void main(String args) {
String host = args[0]; // 加载服务IP地址
String db = args[1]; // 数据库名
String table=args[2]; // 要加载的表名
int psize = Integer.parseInt(args[3]) ;
int thdCount = Integer.parseInt(args[4]);
String sgLoader = args[5]; //sgLoader路径
String fileName = args[6]; //加载的文件名
ArrayList loadThreads = new ArrayList(thdCount);
long beginTime = System.currentTimeMillis;
// 启动加载线程。
for(int i=1; i
Lineorder gbl = new Lineorder(host,db,table,psize,sgLoader,fileName);
Thread thd = new Thread(gbl);
loadThreads.add(thd);
}
for (Thread t : loadThreads) {
t.start;
}
try {
t.join;
} catch (InterruptedException e) {
e.printStackTrace;
}
}
long endTime = System.currentTimeMillis;
double useTime = (endTime - beginTime) / 1000; // 秒
//double speed = (psize * pcount * thdCount ) / useTime;
//System.out.println("rowCount: "+ psize * pcount * thdCount );
System.out.println("time: "+ useTime + " sec");
//System.out.println("speed: " + speed + " row/sec");
}
}
Java代码编译及执行执行:java Lineorder 192.168.2.188 testdb lineorder 10000 1 /opt/gbload_server/api_file/ /opt/gbload_server/data/lineorder.tbl
Lineorder为java编译出来的class文件
192.168.2.188为gbload_server运行所在机器的IP地址;
testdb:gbase 8a MPP cluster数据库中的数据库名称;
Lineorder:表名称;
10000:每次提交的数据量,建议10000~20000,但不是绝对,根据服务器的配置灵活调整,多测试几组数据找出最有的批量提交值;
1:线程数
/opt/ gbload_server /api_file/:sgloader-0.0.1.jar所在的路径
/opt/gbload_server/data/lineorder.tbl:需要加载的数据文件
测试加载,如下图所未,gbload加载成功三、datax部署
1、解压datax.zip到/opt目录
2、解压公司的datax_plugin_build3.0.zip解压包,对应的plugin下面有reader和writer,放到datax的plugin下对应目录下面
3、用gbload_server的gcluster-load-api-8.5.1.2-build_20241217080553.jar包,datax的plugin里gbase8amppgbloaderwriter和gbase8amppjdbcwriter对应的jar包,保持版本一致(gbload_server高版本与datax自带jar包兼容有问题)
4、 python datax.py -r mysqlreader -w gbase8amppgbloaderwriter 可以生成样例,可以根据生成的样例进行修改,生成json任务文件
5、执行python datax.py ${name}.json
任务执行最后,统计信息
来源:GBASE数据库