Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,25 @@ public static List<String> getTableIndex(String schema, String tableName, Connec
return indexList;
}

/**
* get primarykey
*
* @param schema
* @param tableName
* @param dbConn
* @return
* @throws SQLException
*/
public static List<String> getTablePrimaryKey(
String schema, String tableName, Connection dbConn) throws SQLException {
ResultSet rs = dbConn.getMetaData().getPrimaryKeys(null, schema, tableName);
List<String> indexList = new LinkedList<>();
while (rs.next()) {
String index = rs.getString(4);
if (StringUtils.isNotBlank(index)) indexList.add(index);
}
return indexList;
}
/**
* 关闭连接资源
*
Expand Down
143 changes: 143 additions & 0 deletions chunjun-connectors/chunjun-connector-vertica11/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>chunjun-connectors</artifactId>
<groupId>com.dtstack.chunjun</groupId>
<version>1.12-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<properties>
<vertx.version>3.9.7</vertx.version>
</properties>

<artifactId>chunjun-connector-vertica11</artifactId>
<name>ChunJun : Connectors : Vertica11</name>

<dependencies>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-jdbc-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>${vertx.version}</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.4.5</version>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>

<dependency>
<groupId>com.vertica.jdbc</groupId>
<artifactId>vertica-jdbc</artifactId>
<version>11.1.1-0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>log4j:log4j</exclude>
<exclude>ch.qos.logback:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<!-- here the phase you need -->
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<copy todir="${basedir}/../../${dist.dir}/connector/vertica11/"
file="${basedir}/target/${project.artifactId}-${project.version}.jar"/>
<move file="${basedir}/../../${dist.dir}/connector/vertica11/${project.artifactId}-${project.version}.jar"
tofile="${basedir}/../../${dist.dir}/connector/vertica11/${project.artifactId}.jar"/>
<delete>
<fileset dir="${basedir}/../../${dist.dir}/connector/vertica11/"
includes="${project.artifactId}-*.jar"
excludes="${project.artifactId}.jar"/>
</delete>
</tasks>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.dtstack.chunjun.connector.vertica11.conf;

import com.dtstack.chunjun.connector.jdbc.conf.JdbcLookupConf;

import java.util.Map;

/** @author menghan on 2022/8/1. */
public class Vertica11LookupConf extends JdbcLookupConf {
/** vertx pool size */
protected int asyncPoolSize = 5;

protected Map<String, Object> poolConf;

public static Vertica11LookupConf build() {
return new Vertica11LookupConf();
}

public Map<String, Object> getPoolConf() {
return poolConf;
}

public Vertica11LookupConf setPoolConf(Map<String, Object> poolConf) {
this.poolConf = poolConf;
return this;
}

public int getAsyncPoolSize() {
return asyncPoolSize;
}

public Vertica11LookupConf setAsyncPoolSize(int asyncPoolSize) {
this.asyncPoolSize = asyncPoolSize;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.dtstack.chunjun.connector.vertica11.converter;

import com.dtstack.chunjun.conf.ChunJunCommonConf;
import com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter;
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
import com.dtstack.chunjun.converter.IDeserializationConverter;
import com.dtstack.chunjun.converter.ISerializationConverter;
import com.dtstack.chunjun.element.ColumnRowData;
import com.dtstack.chunjun.element.column.BigDecimalColumn;
import com.dtstack.chunjun.element.column.BooleanColumn;
import com.dtstack.chunjun.element.column.BytesColumn;
import com.dtstack.chunjun.element.column.SqlDateColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.element.column.TimeColumn;
import com.dtstack.chunjun.element.column.TimestampColumn;

import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;

/** @author menghan */
public class Vertica11ColumnConverter extends JdbcColumnConverter {

public Vertica11ColumnConverter(RowType rowType, ChunJunCommonConf commonConf) {
super(rowType, commonConf);
}

/**
* Convert external database type to flink internal type
*
* @param type
* @return
*/
@Override
protected IDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
return val -> new BooleanColumn(Boolean.parseBoolean(val.toString()));
case TINYINT:
case SMALLINT:
case INTEGER:
return val -> new BigDecimalColumn((Long) val);
case FLOAT:
return val -> new BigDecimalColumn((BigDecimal) val);
case DOUBLE:
case BIGINT:
return val -> new BigDecimalColumn((Long) val);
case DECIMAL:
return val -> new BigDecimalColumn((BigDecimal) val);
case CHAR:
case VARCHAR:
return val -> new StringColumn((String) val);
case DATE:
return val -> new SqlDateColumn((Date) val);
case TIME_WITHOUT_TIME_ZONE:
return val -> new TimeColumn((Time) val);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return val -> new TimestampColumn((Timestamp) val);
case BINARY:
return val -> new BytesColumn((byte[]) val);
case VARBINARY:
return val -> new BytesColumn((byte[]) val);
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}

/**
* Convert data types inside flink to external database system types
*
* @param type
* @return
*/
@Override
protected ISerializationConverter<FieldNamedPreparedStatement> createExternalConverter(
LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
return (val, index, statement) ->
statement.setBoolean(
index, ((ColumnRowData) val).getField(index).asBoolean());
case TINYINT:
case SMALLINT:
case INTEGER:
return (val, index, statement) ->
statement.setLong(index, ((ColumnRowData) val).getField(index).asLong());
case FLOAT:
return (val, index, statement) ->
statement.setBigDecimal(
index, ((ColumnRowData) val).getField(index).asBigDecimal());
case DOUBLE:
case BIGINT:
return (val, index, statement) ->
statement.setLong(index, ((ColumnRowData) val).getField(index).asLong());
case DECIMAL:
return (val, index, statement) ->
statement.setBigDecimal(
index, ((ColumnRowData) val).getField(index).asBigDecimal());
case CHAR:
case VARCHAR:
return (val, index, statement) ->
statement.setString(
index, ((ColumnRowData) val).getField(index).asString());
case DATE:
return (val, index, statement) ->
statement.setDate(index, ((ColumnRowData) val).getField(index).asSqlDate());
case TIME_WITHOUT_TIME_ZONE:
return (val, index, statement) ->
statement.setTime(index, ((ColumnRowData) val).getField(index).asTime());
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int timestampPrecision = ((TimestampType) type).getPrecision();
return (val, index, statement) ->
statement.setTimestamp(
index, val.getTimestamp(index, timestampPrecision).toTimestamp());
case BINARY:
return (val, index, statement) -> statement.setBytes(index, val.getBinary(index));
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}
}
Loading