使⽤SqlBulkCopy批量插⼊更新数据
在开发中遇到了⼀张表的数据因为只做了同步插⼊⽽没有做同步更新的操作,导致了百万数据不准确。⾯对⼤量数据需要更新,传统的循环逐条插⼊以及拼接1000条数据插⼊都⽐较耗时,⽹上有博主做出了相关测试。
根据以上场景,新建控制台程序。config添加数据库连接配置,sqlHelper连接更新数据源,sqlBulkCopyHelper连接更新⽬标库。
创建sqlHelper类
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SqlBulkCopyHelper
{
public class sqlHelper
{
///<summary>
///数据库操作帮助类
///此段基础代码为SQLServer数据库帮助类
///如需操作MySQL
/// 1.将代码中Sql改为MySql
/
// 2.添加MySql.Data.dll引⽤(可通过官⽹或NuGet)
/// 3.using System.Data.SqlClient;替换为using MySql.Data.MySqlClient;
///</summary>
///<summary>
///数据库连接字符串
///</summary>
private static string connectionStr =
System.Configuration.ConfigurationManager.ConnectionStrings["ConnectionName"].ConnectionString;
public sqlHelper() { }
public sqlHelper(string connectionStr)
{
}
///<summary>
///得到连接对象
///</summary>
///<returns></returns>
public static SqlConnection GetConn()
{
SqlConnection sqlconn = null;
sqlconn = new SqlConnection(connectionStr);
return sqlconn;
}
///<summary>
///查询操作
///</summary>
///<param name="sql"></param>
///<returns></returns>
public static DataTable GetDataTable(string sql, params SqlParameter[] sp)
{
using (SqlConnection conn = GetConn())
{
conn.Open();
using (SqlDataAdapter sda = new SqlDataAdapter(sql, conn))
{
sda.SelectCommand.Parameters.AddRange(sp);
DataTable dt = new DataTable();
sda.Fill(dt);
return dt;
}
}
}
///<summary>
///增删改操作
/
//</summary>
///<param name="sql">sql语句</param>
///<returns>执⾏后的条数</returns>
public static int ExecuteNonQuery(string sql, params SqlParameter[] sp)
{
using (SqlConnection conn = GetConn())
{
conn.Open();
using (SqlCommand cmd = new SqlCommand(sql, conn))
{
cmd.Parameters.AddRange(sp);
int i = cmd.ExecuteNonQuery();
return i;
}
}
}
///<summary>
///执⾏⼀条SQL语句,返回⾸⾏⾸列
///</summary>
///<param name="sql">sql语句</param>
///<returns>⾸⾏⾸列</returns>
public static object ExecuteScalar(string sql, params SqlParameter[] sp)
{
using (SqlConnection conn = GetConn())
{
conn.Open();
using (SqlCommand cmd = new SqlCommand(sql, conn))
{
cmd.Parameters.AddRange(sp);
return cmd.ExecuteScalar();
}
}
}
}
}
创建sqlBulkCopyHelper
///<summary>
/// SqlBulkCopy 批量更新数据
///</summary>
///<param name="dataTable">数据集</param>
///<param name="crateTemplateSql">临时表创建字段</param>
///<param name="updateSql">更新语句</param>
public static void BulkUpdateData(DataTable dataTable, string crateTemplateSql, string updateSql)
{
ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.PerUserRoamingAndLocal);
using (var conn = new SqlConnection(ConfigurationManager.ConnectionStrings["DefaultConnection"].ConnectionString)) {
using (var command = new SqlCommand("", conn))
{
try
{
conn.Open();
//数据库并创建⼀个临时表来保存数据表的数据
command.CommandText = String.Format(" CREATE TABLE #TmpTable ({0})", crateTemplateSql);
command.ExecuteNonQuery();
//使⽤SqlBulkCopy 加载数据到临时表中
using (var bulkCopy = new SqlBulkCopy(conn))
{
foreach (DataColumn dcPrepped in dataTable.Columns)
{
bulkCopy.ColumnMappings.Add(dcPrepped.ColumnName, dcPrepped.ColumnName);
}
bulkCopy.BulkCopyTimeout = 660;
bulkCopy.DestinationTableName = "#TmpTable";
bulkCopy.WriteToServer(dataTable);
bulkCopy.Close();
}
// 执⾏Command命令使⽤临时表的数据去更新⽬标表中的数据然后删除临时表
command.CommandTimeout = 300;
command.CommandText = updateSql;
command.ExecuteNonQuery();
}
finally
{
conn.Close();
}
}
}
}
Program代码
///<summary>
///更新表数据
///</summary>
public static void update()
{
String sqlstring = @"";
System.Diagnostics.Stopwatch stopwatch = new Stopwatch();
stopwatch.Start(); // 开始监视代码运⾏时间
DataTable dt = sqlHelper.GetDataTable(sqlstring);
stopwatch.Stop(); // 停⽌监视
Console.WriteLine("执⾏查询sql⽤时:" + stopwatch.Elapsed.TotalSeconds + "秒,共查询到:" + dt.Rows.Count + "⾏");
String updateSql = @"Merge into Table AS T
Using #TmpTable AS S
ON (T.order_no = S.order_no and T.item_code = S.item_code )
WHEN MATCHED
THEN UPDATE SET T.[qty]=S.[qty],T.[total_amount]=S.[total_amount];";
String crateTemplateSql = @"
[order_no] [varchar](32) NULL,
[qty] [int] NULL,
[total_amount] [decimal](18, 2) NULL,
[item_code] [varchar](32) NULL,";
for (int i = 0; i < (dt.Rows.Count + 10000 - 1) / 10000; i++)
{
System.Diagnostics.Stopwatch stopwatch2 = new Stopwatch();
stopwatch2.Start();
sqlBulkCopyHelper.BulkUpdateData(dt.AsEnumerable().Skip(i * 10000).Take(10000).CopyToDataTable(), crateTemplateSql, updateSql);
stopwatch2.Stop();
Console.WriteLine("更新第" + (i + 1) + "次耗时:" + stopwatch2.Elapsed.TotalSeconds + "秒,剩余" + ((dt.Rows.Count + 10000 - 1) / 10000 - i - 1) + "次"); }
Console.ReadLine();
}
1.更新的时候,datatable数据量过⼤内存不够⽤,这⾥是分了⼀下页。
批量更新sql语句 2.还需要注意的就是sqlBulkCopy在使⽤的时候,视图或者是源表的字段⼤⼩写、类型必须与⽬标表⼀致。
3. Merge,使⽤merger语句可以将插⼊、更新、删除合并成⼀句,完成存在就更新不存在就插⼊的需求。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论