Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c764aff
[feat-877][hbase]refactory hbase
May 24, 2022
41a376f
[feat-874][jdbc]the sql WHERE condition add [splitKey is null] for th…
May 24, 2022
039c7c1
[feat-880][hdfs]support orc writer when fullcolname size greater than…
May 24, 2022
b14ce8b
[feat-882][inceptor]Use file reading instead of JDBC
May 25, 2022
1dba62d
[hotfix-884][doris][ftp]print dirty data when dirty type is log for d…
May 25, 2022
1495109
[feat-875][doris] postgresql sync support array type.
May 24, 2022
ba18634
[feat-861][protobuf]add protobuf format
Paddy0523 May 23, 2022
122a701
[hotfix-889][Test] Fix chunjun package failed because test code not f…
FlechazoW May 26, 2022
ab6d594
[feat-857][converter]optimize RowSizeCalculator
Paddy0523 May 23, 2022
6d82f9c
[feat-#893][jdbc]range split strategy support all numeric type
Paddy0523 May 28, 2022
f4b6a49
[feat-#846][hdfs]hdfs sql support partitionColumn
Paddy0523 May 29, 2022
4b8b9a1
[hotfix-842][chunjun-sql] fix the problem of "java.lang.SecurityExcep…
lvyanquan Jun 1, 2022
c0e3e7e
[feat-#903][jdbc]SQL supports the orderByColumn configuration item
Paddy0523 Jun 1, 2022
8f50f2f
[hotfix-#906][typo]Correct spelling mistakes BigIntegerAccmulator -> …
Paddy0523 Jun 1, 2022
052e2e1
[hotfix-820][binlog] Fixed Canal Parse Exception
libailin May 19, 2022
209bc43
[hotfix-#764][jdbc]fix duplicate data occurred when use jdbc polling …
Paddy0523 Jun 1, 2022
f1808da
[hotfix-#899][conf] fix config check error
huyuanfeng2018 May 31, 2022
9bef26f
[hotfix-#913][binlog] Fixed binlog plugin not being able to specify b…
Jun 6, 2022
8856cc5
[feat-#914][jdbcSplit]1.incremental mode with parallelism >1 ,support…
Paddy0523 Jun 6, 2022
4a38bbb
[hotfix-#919][jdbc]1.Fixed loss of endLocation when recovering from c…
Paddy0523 Jun 6, 2022
d1e8be5
[feat-#929][bin] provided some shell scripts for reducing parameters …
lvyanquan Jun 9, 2022
23215cc
[hotfix-#926][example]Remove timestamp field in stream.json
Paddy0523 Jun 9, 2022
d41e673
[docfix-DTStack#936][docs] replace kafka-sink's consumerSettings to p…
Jun 10, 2022
e98d350
[fix-logminer] #997 just load archive log when ORA-00310 exception ha…
Jun 24, 2022
f3b1362
Merge remote-tracking branch 'origin/master' into master_dujie
Jun 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.alibaba.otter.canal.parse.inbound.mysql.ddl.DruidDdlParser;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -286,9 +287,18 @@ private void parseColumnList(
@Override
protected IDeserializationConverter createInternalConverter(String type) {
String substring = type;
int index = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL);
// 为了支持无符号类型 如 int unsigned
if (StringUtils.contains(substring, ConstantValue.DATA_TYPE_UNSIGNED)) {
substring = substring.replaceAll(ConstantValue.DATA_TYPE_UNSIGNED, "").trim();
}

if (StringUtils.contains(substring, ConstantValue.DATA_TYPE_UNSIGNED_LOWER)) {
substring = substring.replaceAll(ConstantValue.DATA_TYPE_UNSIGNED_LOWER, "").trim();
}

int index = substring.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL);
if (index > 0) {
substring = type.substring(0, index);
substring = substring.substring(0, index);
}
switch (substring.toUpperCase(Locale.ENGLISH)) {
case "BIT":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public boolean sink(
processRowChange(rowChange, schema, table, executeTime);
} catch (WriteRecordException e) {
// todo 脏数据记录
if (LOG.isTraceEnabled()) {
LOG.trace(
if (LOG.isDebugEnabled()) {
LOG.debug(
"write error rowData, rowData = {}, e = {}",
e.getRowData().toString(),
ExceptionUtil.getErrorMessage(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ private static Connection getConnectionWithKerberos(

UserGroupInformation ugi;
try {
ugi =
KerberosUtil.loginAndReturnUgi(
conf.get(KerberosUtil.KEY_PRINCIPAL_FILE), principal, keytabFileName);
ugi = KerberosUtil.loginAndReturnUgi(conf, principal, keytabFileName);
} catch (Exception e) {
throw new RuntimeException("Login kerberos error:", e);
}
Expand Down
3 changes: 3 additions & 0 deletions chunjun-connectors/chunjun-connector-inceptor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@
<excludes>
<exclude>org.apache.hadoop.security.GroupMappingServiceProvider</exclude>
<exclude>org.apache.hadoop.fs.*</exclude>
<exclude>org.apache.hadoop.fs.permission.*</exclude>

<exclude>org.apache.hadoop.conf.*</exclude>

Expand All @@ -246,6 +247,8 @@
<exclude>org.apache.hadoop.filecache.*</exclude>

<exclude>org.apache.hadoop.io.retry.*</exclude>
<exclude>org.apache.hadoop.ipc.RemoteException</exclude>
<exclude>org.apache.hadoop.ipc.StandbyException</exclude>
<exclude>org.apache.hadoop.hdfs.server.*</exclude>
<exclude>org.apache.hadoop.hdfs.server.namenode.*</exclude>
<exclude>org.apache.hadoop.hdfs.server.namenode.ha.*</exclude>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.hadoop.hdfs.server.namenode.ha;

import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class ConfiguredFailoverProxyProvider<T> extends AbstractNNFailoverProxyProvider<T> {
private static final Log LOG = LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
private final Configuration conf;
private final List<ConfiguredFailoverProxyProvider.AddressRpcProxyPair<T>> proxies =
new ArrayList();
private final UserGroupInformation ugi;
private final Class<T> xface;
private int currentProxyIndex = 0;

public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, Class<T> xface) {
Preconditions.checkArgument(
xface.isAssignableFrom(NamenodeProtocols.class),
"Interface class %s is not a valid NameNode protocol!");
this.xface = xface;
this.conf = new Configuration(conf);
int maxRetries = this.conf.getInt("dfs.client.failover.connection.retries", 0);
this.conf.setInt("ipc.client.connect.max.retries", maxRetries);
int maxRetriesOnSocketTimeouts =
this.conf.getInt("dfs.client.failover.connection.retries.on.timeouts", 0);
this.conf.setInt("ipc.client.connect.max.retries.on.timeouts", maxRetriesOnSocketTimeouts);

try {
this.ugi = UserGroupInformation.getCurrentUser();
Map<String, Map<String, InetSocketAddress>> map = DFSUtil.getHaNnRpcAddresses(conf);
Map<String, InetSocketAddress> addressesInNN = (Map) map.get(uri.getHost());
if (addressesInNN != null && addressesInNN.size() != 0) {
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
Iterator var9 = addressesOfNns.iterator();

while (var9.hasNext()) {
InetSocketAddress address = (InetSocketAddress) var9.next();
this.proxies.add(
new ConfiguredFailoverProxyProvider.AddressRpcProxyPair(address));
}

HAUtil.cloneDelegationTokenForLogicalUri(this.ugi, uri, addressesOfNns);
} else {
throw new RuntimeException(
"Could not find any configured addresses for URI " + uri);
}
} catch (IOException var11) {
throw new RuntimeException(var11);
}
}

public Class<T> getInterface() {
return this.xface;
}

public synchronized ProxyInfo<T> getProxy() {
ConfiguredFailoverProxyProvider.AddressRpcProxyPair<T> current =
(ConfiguredFailoverProxyProvider.AddressRpcProxyPair)
this.proxies.get(this.currentProxyIndex);
if (current.namenode == null) {
try {
current.namenode =
NameNodeProxies.createNonHAProxy(
this.conf,
current.address,
this.xface,
this.ugi,
false,
this.fallbackToSimpleAuth)
.getProxy();
} catch (IOException var3) {
LOG.error("Failed to create RPC proxy to NameNode", var3);
throw new RuntimeException(var3);
}
}

return new ProxyInfo(current.namenode, current.address.toString());
}

public synchronized void performFailover(T currentProxy) {
this.currentProxyIndex = (this.currentProxyIndex + 1) % this.proxies.size();
}

public synchronized void close() throws IOException {
Iterator var1 = this.proxies.iterator();

while (var1.hasNext()) {
ConfiguredFailoverProxyProvider.AddressRpcProxyPair<T> proxy =
(ConfiguredFailoverProxyProvider.AddressRpcProxyPair) var1.next();
if (proxy.namenode != null) {
if (proxy.namenode instanceof Closeable) {
((Closeable) proxy.namenode).close();
} else {
RPC.stopProxy(proxy.namenode);
}
}
}
}

public boolean useLogicalURI() {
return true;
}

private static class AddressRpcProxyPair<T> {
public final InetSocketAddress address;
public T namenode;

public AddressRpcProxyPair(InetSocketAddress address) {
this.address = address;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
*/
public class MongodbOutputFormatBuilder extends BaseRichOutputFormatBuilder {
MongodbDataSyncConf mongodbDataSyncConf;
String upsertKey;

public MongodbOutputFormatBuilder(MongodbDataSyncConf mongodbDataSyncConf) {
this.mongodbDataSyncConf = mongodbDataSyncConf;
this.upsertKey = mongodbDataSyncConf.getReplaceKey();
MongoClientConf mongoClientConf =
MongoClientConfFactory.createMongoClientConf(mongodbDataSyncConf);
MongodbOutputFormat.WriteMode writeMode =
Expand All @@ -50,12 +52,12 @@ public MongodbOutputFormatBuilder(MongodbDataSyncConf mongodbDataSyncConf) {

public MongodbOutputFormatBuilder(
MongoClientConf mongoClientConf, String key, MongodbOutputFormat.WriteMode writeMode) {
this.upsertKey = key;
this.format = new MongodbOutputFormat(mongoClientConf, key, writeMode);
}

@Override
protected void checkFormat() {
String upsertKey = mongodbDataSyncConf.getReplaceKey();
if (!StringUtils.isBlank(upsertKey)) {
List<FieldConf> fields = mongodbDataSyncConf.getColumn();
boolean flag = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
public class MongoAllTableFunction extends AbstractAllTableFunction {

private static final int FETCH_SIZE = 1000;
private final int fetchSize;
private final MongoClientConf mongoClientConf;
private transient MongoClient mongoClient;
private transient MongoCollection collection;
Expand All @@ -57,6 +57,7 @@ public MongoAllTableFunction(
String[] fieldNames) {
super(fieldNames, keyNames, lookupConf, new MongodbRowConverter(rowType, fieldNames));
this.mongoClientConf = mongoClientConf;
this.fetchSize = lookupConf.getFetchSize();
}

@Override
Expand All @@ -70,7 +71,7 @@ protected void loadData(Object cacheRef) {
Map<String, List<Map<String, Object>>> tmpCache =
(Map<String, List<Map<String, Object>>>) cacheRef;

FindIterable<Document> findIterable = collection.find().limit(FETCH_SIZE);
FindIterable<Document> findIterable = collection.find().limit(fetchSize);
MongoCursor<Document> mongoCursor = findIterable.iterator();
while (mongoCursor.hasNext()) {
Document doc = mongoCursor.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* @author Ada Wong
Expand Down Expand Up @@ -86,7 +87,7 @@ public void handleAsyncInvoke(CompletableFuture<Collection<RowData>> future, Obj
basicDbObject.append(keyNames[i], keys[i]);
}

AtomicInteger atomicInteger = new AtomicInteger(0);
List<RowData> rowList = new CopyOnWriteArrayList<>();

Block<Document> block =
(document) -> {
Expand All @@ -96,15 +97,16 @@ public void handleAsyncInvoke(CompletableFuture<Collection<RowData>> future, Obj
} catch (Exception e) {
LOG.error("", e);
}
atomicInteger.incrementAndGet();
future.complete(Collections.singleton(row));
rowList.add(row);
};

SingleResultCallback<Void> callbackWhenFinished =
(result, t) -> {
if (atomicInteger.get() <= 0) {
if (rowList.size() <= 0) {
LOG.warn("Cannot retrieve the data from the database");
future.complete(Collections.EMPTY_LIST);
} else {
future.complete(rowList);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* @author Ada Wong
Expand Down Expand Up @@ -87,20 +88,21 @@ public void eval(CompletableFuture<Collection<RowData>> future, Object... keys)
basicDbObject.append(keyNames[i], keys[i]);
}

AtomicInteger atomicInteger = new AtomicInteger(0);
List<RowData> rowList = new CopyOnWriteArrayList<>();

Block<Document> block =
(document) -> {
RowData row = converter.toInternalLookup(document);
atomicInteger.incrementAndGet();
future.complete(Collections.singleton(row));
rowList.add(row);
};

SingleResultCallback<Void> callbackWhenFinished =
(result, t) -> {
if (atomicInteger.get() <= 0) {
if (rowList.size() <= 0) {
LOG.warn("Cannot retrieve the data from the database");
future.complete(Collections.EMPTY_LIST);
} else {
future.complete(rowList);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public void setType(String type) {
this.type = type;
}

public boolean isOnline() {
return "ONLINE".equals(this.type);
}

@Override
public String toString() {
return "LogFile{"
Expand Down
Loading