Last active
November 13, 2023 12:38
-
-
Save yanweijia/208e3262e0ce44c069ddd870230d6639 to your computer and use it in GitHub Desktop.
外部数据与 db 数据比对 & 增量更新工具类
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.util.function.BiConsumer; | |
import java.util.function.BiPredicate; | |
import java.util.function.Consumer; | |
import java.util.function.Function; | |
import java.util.stream.Collectors; | |
/** | |
* 增量更新工具类<br/> | |
* 主要用于将增量数据筛选出来 | |
场景: | |
db中有全量数据, 然后第三方接口返回全量数据, 需要比对并在db中增量更新. | |
主要适合于从外部同步全量数据并对db进行更新的场景, 可以避免写出 on duplicate key for update 类似语句减少db死锁情况. 数据量级不宜太大, 建议小于50w条记录比对使用 | |
数据会以keyMapper字段为唯一标识进行判断, 在将list聚合为map的过程中, 会以keyMapper字段为map的key进行添加, 遇到重复项会以新的item进行替换旧的item. | |
* @author yanweijia | |
*/ | |
public class IncreUpdateUtil { | |
/** | |
* 获取新增的数据list | |
* | |
* @param allDataList 调用接口返回的全量数据 | |
* @param keyMapper 数据唯一标识, 如 CorpTagGroup::getGroupId | |
* @param newDataOperation 需要对筛选出来待插入db的数据 做初始化操作 | |
* @see #filterNewDataList(Map, Map, Consumer) | |
*/ | |
public static <K, V> List<V> filterNewDataList(List<V> allDataList, List<V> dbDataList, Function<V, K> keyMapper, Consumer<V> newDataOperation) { | |
Map<K, V> allDataMap = allDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal)); | |
Map<K, V> dbDataMap = dbDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal)); | |
return filterNewDataList(allDataMap, dbDataMap, newDataOperation); | |
} | |
/** | |
* 构造待删除数据的List | |
* | |
* @see #filterRemoveList(Map, Map) | |
*/ | |
public static <K, V> List<V> filterRemoveList(List<V> allDataList, List<V> dbDataList, Function<V, K> keyMapper) { | |
Map<K, V> allDataMap = allDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal)); | |
Map<K, V> dbDataMap = dbDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal)); | |
return filterRemoveList(allDataMap, dbDataMap); | |
} | |
/** | |
* 筛选待更新的数据, 并使用 updateFieldBiConsumer 提供的逻辑将数据更新 | |
* | |
* @see #filterUpdateList(Map, Map, BiPredicate, BiConsumer) | |
*/ | |
public static <K, V> List<V> filterUpdateList(List<V> allDataList, List<V> dbDataList, Function<V, K> keyMapper, BiPredicate<V, V> condition, BiConsumer<V, V> updateFieldBiConsumer) { | |
Map<K, V> allDataMap = allDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal)); | |
Map<K, V> dbDataMap = dbDataList.stream().collect(Collectors.toMap(keyMapper, item -> item, (oldVal, newVal) -> newVal)); | |
return filterUpdateList(allDataMap, dbDataMap, condition, updateFieldBiConsumer); | |
} | |
/** | |
* 构造新增数据的list | |
* | |
* @param allMap 接口返回的数据, key为唯一标识 | |
* @param dbDataMap db查询到的数据, key为唯一标识, value为db对象 | |
* @param newDataOperation 需要对筛选出来待插入db的数据 做初始化操作 | |
*/ | |
public static <K, V> List<V> filterNewDataList(Map<K, V> allMap, Map<K, V> dbDataMap, Consumer<V> newDataOperation) { | |
//将接口返回的数据和db中的数据取差集即为需要 新插入db的数据 | |
Set<K> newDataKeysSet = new HashSet<>(allMap.keySet()); | |
newDataKeysSet.removeAll(dbDataMap.keySet()); | |
List<V> newDataList = new ArrayList<>(); | |
newDataKeysSet.forEach(key -> { | |
V item = allMap.get(key); | |
//对 item 做自定义操作 | |
newDataOperation.accept(item); | |
newDataList.add(item); | |
}); | |
return newDataList; | |
} | |
/** | |
* 构造待删除数据的 list | |
*/ | |
public static <K, V> List<V> filterRemoveList(Map<K, V> allMap, Map<K, V> dbDataMap) { | |
Set<K> removeDataKeySet = new HashSet<>(dbDataMap.keySet()); | |
removeDataKeySet.removeAll(allMap.keySet()); | |
List<V> removeDataList = new ArrayList<>(); | |
removeDataKeySet.forEach(key -> removeDataList.add(dbDataMap.get(key))); | |
return removeDataList; | |
} | |
/** | |
* 筛选待更新的数据 | |
* | |
* @param allMap 调用接口返回的数据 | |
* @param dbDataMap db中的数据 | |
* @param condition 判断是否需要更新的条件, 结果为true则该数据是需要更新的数据 | |
* @param updateFieldBiConsumer 对待更新数据进行填充 | |
*/ | |
public static <K, V> List<V> filterUpdateList(Map<K, V> allMap, Map<K, V> dbDataMap, BiPredicate<V, V> condition, BiConsumer<V, V> updateFieldBiConsumer) { | |
Set<K> updateDataKeySet = new HashSet<>(allMap.keySet()); | |
updateDataKeySet.retainAll(dbDataMap.keySet()); | |
List<V> updateDataList = new ArrayList<>(); | |
updateDataKeySet.forEach(key -> { | |
V data1 = allMap.get(key); | |
V dbData = dbDataMap.get(key); | |
// 判断是否需要更新的逻辑 | |
if (condition.test(data1, dbData)) { | |
// updateDataList 更新字段逻辑 (将data1中的部分变化字段赋值给dbData) | |
updateFieldBiConsumer.accept(data1, dbData); | |
updateDataList.add(dbData); | |
} | |
}); | |
return updateDataList; | |
} | |
} | |
/* | |
// 测试样例: | |
public void zengliangUtil() { | |
List<CorpTagGroup> corpTagListWx = new ArrayList<>(); | |
List<CorpTagGroup> corpTagListDb = new ArrayList<>(); | |
corpTagListWx.add(new CorpTagGroup(null, "groupId1", "标签组1", null, "companyCode", 1, null, null, null)); | |
corpTagListWx.add(new CorpTagGroup(null, "groupId2", "标签组2", null, "companyCode", 2, null, null, null)); | |
corpTagListWx.add(new CorpTagGroup(null, "groupId3", "标签组3改名了", null, "companyCode", 3, null, null, null)); | |
corpTagListDb.add(new CorpTagGroup(2L, "groupId2", "标签组2", null, "companyCode", 2, null, null, true)); | |
corpTagListDb.add(new CorpTagGroup(3L, "groupId3", "标签组3", null, "companyCode", 3, null, null, true)); | |
corpTagListDb.add(new CorpTagGroup(4L, "groupId4", "标签组4", null, "companyCode", 4, null, null, true)); | |
List<CorpTagGroup> listToInsert = IncreUpdateUtil.filterNewDataList(corpTagListWx, corpTagListDb, CorpTagGroup::getGroupId | |
, item -> { | |
item.setInserttime(new Date()); | |
item.setIsactive(true); | |
}); | |
List<CorpTagGroup> listToRemove = IncreUpdateUtil.filterRemoveList(corpTagListWx, corpTagListDb, CorpTagGroup::getGroupId); | |
List<CorpTagGroup> listToUpdate = IncreUpdateUtil.filterUpdateList(corpTagListWx, corpTagListDb, CorpTagGroup::getGroupId | |
, (wx, db) -> !StringUtils.equals(wx.getGroupName(), db.getGroupName()) || !wx.getGroupOrder().equals(db.getGroupOrder()) | |
, (wx, db) -> { | |
db.setGroupOrder(wx.getGroupOrder()); | |
db.setGroupName(wx.getGroupName()); | |
db.setUpdatetime(new Date()); | |
db.setIsactive(true); | |
}); | |
System.out.println(listToInsert); | |
System.out.println(listToRemove); | |
System.out.println(listToUpdate); | |
System.out.println("done"); | |
} | |
//运行结果: | |
//Connected to the target VM, address: '127.0.0.1:64356', transport: 'socket' | |
//[CorpTagGroup(id=null, groupId=groupId1, groupName=标签组1, createTime=null, companyCode=companyCode, groupOrder=1, inserttime=Wed Jul 29 10:50:11 CST 2020, updatetime=null, isactive=true)] | |
//[CorpTagGroup(id=4, groupId=groupId4, groupName=标签组4, createTime=null, companyCode=companyCode, groupOrder=4, inserttime=null, updatetime=null, isactive=true)] | |
//[CorpTagGroup(id=3, groupId=groupId3, groupName=标签组3改名了, createTime=null, companyCode=companyCode, groupOrder=3, inserttime=null, updatetime=Wed Jul 29 10:50:11 CST 2020, isactive=true)] | |
//done | |
//Disconnected from the target VM, address: '127.0.0.1:64356', transport: 'socket' | |
//Process finished with exit code 0 | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.util.function.BiConsumer; | |
import java.util.function.BiPredicate; | |
import java.util.function.Consumer; | |
import java.util.function.Function; | |
import java.util.stream.Collectors; | |
/** | |
* 增量更新工具类<br/> | |
* 主要用于将增量数据筛选出来 | |
* <p> | |
* 场景: | |
* db中有全量数据, 然后第三方接口返回全量数据, 需要比对并在db中增量更新. | |
* 主要适合于从外部同步全量数据并对db进行更新的场景, 可以避免写出 on duplicate key for update 类似语句减少db死锁情况. 数据量级不宜太大, 建议小于50w条记录比对使用 | |
* 数据会以keyMapper字段为唯一标识进行判断, 在将list聚合为map的过程中, 会以keyMapper字段为map的key进行添加, 遇到重复项会以新的item进行替换旧的item. | |
* | |
* @author weijia.yan | |
* @date 2023/11/13 | |
*/ | |
public class IncreUpdateUtil { | |
/** | |
* 获取新增的数据list | |
* | |
* @param allDataList 调用接口返回的全量数据 | |
* @param key1Mapper 数据唯一标识, 如 CorpTagGroup::getGroupId | |
* @param key2Mapper 数据唯一标识, 如 CorpTagGroup::getGroupId | |
* @param newDataOperation 需要对筛选出来待插入db的数据 做初始化操作 | |
* @see #filterNewDataList(Map, Map, Consumer) | |
*/ | |
public static <K, V1, V2> List<V1> filterNewDataList(List<V1> allDataList, List<V2> dbDataList, Function<V1, K> key1Mapper, Function<V2, K> key2Mapper, Consumer<V1> newDataOperation) { | |
Map<K, V1> allDataMap = allDataList.stream().collect(Collectors.toMap(key1Mapper, item -> item, (oldVal, newVal) -> newVal)); | |
Map<K, V2> dbDataMap = dbDataList.stream().collect(Collectors.toMap(key2Mapper, item -> item, (oldVal, newVal) -> newVal)); | |
return filterNewDataList(allDataMap, dbDataMap, newDataOperation); | |
} | |
/** | |
* 构造待删除数据的List | |
* | |
* @see #filterRemoveList(Map, Map) | |
*/ | |
public static <K, V1, V2> List<V2> filterRemoveList(List<V1> allDataList, List<V2> dbDataList, Function<V1, K> key1Mapper, Function<V2, K> key2Mapper) { | |
Map<K, V1> allDataMap = allDataList.stream().collect(Collectors.toMap(key1Mapper, item -> item, (oldVal, newVal) -> newVal)); | |
Map<K, V2> dbDataMap = dbDataList.stream().collect(Collectors.toMap(key2Mapper, item -> item, (oldVal, newVal) -> newVal)); | |
return filterRemoveList(allDataMap, dbDataMap); | |
} | |
/** | |
* 筛选待更新的数据, 并使用 updateFieldBiConsumer 提供的逻辑将数据更新 | |
* | |
* @param condition 判断是否需要更新的条件(比较非主键信息,比如同一个 id 的姓名发生变化, 就需要更新 db 数据), 结果为true则该数据是需要更新的数据. 格式为:(outData1,dbData2)->{StringUtils.equals(data1.name,data2.name)} | |
* @param updateFieldBiConsumer 可为空, 如果数据不一致需要更新, 则执行此 consumer, 后续返回的数据就是更新后的data2 数据. 比如(data1,data2)->{data2.setName(date1.getName());} | |
* @see #filterUpdateList(Map, Map, BiPredicate, BiConsumer) | |
*/ | |
public static <K, V1, V2> List<V2> filterUpdateList(List<V1> allDataList, List<V2> dbDataList, Function<V1, K> key1Mapper, Function<V2, K> key2Mapper, BiPredicate<V1, V2> condition, BiConsumer<V1, V2> updateFieldBiConsumer) { | |
Map<K, V1> allDataMap = allDataList.stream().collect(Collectors.toMap(key1Mapper, item -> item, (oldVal, newVal) -> newVal)); | |
Map<K, V2> dbDataMap = dbDataList.stream().collect(Collectors.toMap(key2Mapper, item -> item, (oldVal, newVal) -> newVal)); | |
return filterUpdateList(allDataMap, dbDataMap, condition, updateFieldBiConsumer); | |
} | |
/** | |
* 构造新增数据的list | |
* | |
* @param allMap 接口返回的数据, key为唯一标识 | |
* @param dbDataMap db查询到的数据, key为唯一标识, value为db对象 | |
* @param newDataOperation 需要对筛选出来待插入db的数据 做初始化操作 | |
*/ | |
public static <K, V1, V2> List<V1> filterNewDataList(Map<K, V1> allMap, Map<K, V2> dbDataMap, Consumer<V1> newDataOperation) { | |
//将接口返回的数据和db中的数据取差集即为需要 新插入db的数据 | |
Set<K> newDataKeysSet = new HashSet<>(allMap.keySet()); | |
newDataKeysSet.removeAll(dbDataMap.keySet()); | |
List<V1> newDataList = new ArrayList<>(); | |
newDataKeysSet.forEach(key -> { | |
V1 item = allMap.get(key); | |
//对 item 做自定义操作 | |
if (null != newDataOperation) { | |
newDataOperation.accept(item); | |
} | |
newDataList.add(item); | |
}); | |
return newDataList; | |
} | |
/** | |
* 构造待删除数据的 list | |
*/ | |
public static <K, V1, V2> List<V2> filterRemoveList(Map<K, V1> allMap, Map<K, V2> dbDataMap) { | |
Set<K> removeDataKeySet = new HashSet<>(dbDataMap.keySet()); | |
removeDataKeySet.removeAll(allMap.keySet()); | |
List<V2> removeDataList = new ArrayList<>(); | |
removeDataKeySet.forEach(key -> removeDataList.add(dbDataMap.get(key))); | |
return removeDataList; | |
} | |
/** | |
* 筛选待更新的数据 | |
* | |
* @param allMap 调用接口返回的数据 | |
* @param dbDataMap db中的数据 | |
* @param condition 判断是否需要更新的条件, 结果为true则该数据是需要更新的数据 | |
* @param updateFieldBiConsumer 对待更新数据进行填充 | |
*/ | |
public static <K, V1, V2> List<V2> filterUpdateList(Map<K, V1> allMap, Map<K, V2> dbDataMap, BiPredicate<V1, V2> condition, BiConsumer<V1, V2> updateFieldBiConsumer) { | |
Set<K> updateDataKeySet = new HashSet<>(allMap.keySet()); | |
updateDataKeySet.retainAll(dbDataMap.keySet()); | |
List<V2> updateDataList = new ArrayList<>(); | |
updateDataKeySet.forEach(key -> { | |
V1 data1 = allMap.get(key); | |
V2 dbData = dbDataMap.get(key); | |
// 判断是否需要更新的逻辑 | |
if (condition.test(data1, dbData)) { | |
// updateDataList 更新字段逻辑 (将data1中的部分变化字段赋值给dbData) | |
if (null != updateFieldBiConsumer) { | |
updateFieldBiConsumer.accept(data1, dbData); | |
} | |
updateDataList.add(dbData); | |
} | |
}); | |
return updateDataList; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment