这里一个非常常见的问题是如何进行 upsert,这就是 MySQL 所称的 INSERT ... ON DUPLICATE UPDATE
并且标准支持作为 MERGE
操作的一部分。
鉴于 PostgreSQL 不直接支持它(在 pg 9.5 之前),你如何做到这一点?考虑以下:
CREATE TABLE testtable (
id integer PRIMARY KEY,
somedata text NOT NULL
);
INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');
现在假设您要“更新”元组 (2, 'Joe')
、(3, 'Alan')
,因此新表内容将是:
(1, 'fred'),
(2, 'Joe'), -- Changed value of existing tuple
(3, 'Alan') -- Added new tuple
这就是人们在讨论 upsert
时所谈论的内容。至关重要的是,任何方法都必须在存在多个事务在同一个表上工作的情况下是安全的 - 通过使用显式锁定或以其他方式防御由此产生的竞争条件。
此主题在 Insert, on duplicate update in PostgreSQL? 中进行了广泛讨论,但这是关于 MySQL 语法的替代方案,并且随着时间的推移,它的一些不相关的细节越来越多。我正在研究确定的答案。
这些技术对于“如果不存在就插入,否则什么都不做”也很有用,即“在重复键忽略时插入...”。
9.5 及更新版本:
PostgreSQL 9.5 和更新版本支持 INSERT ... ON CONFLICT (key) DO UPDATE
(和 ON CONFLICT (key) DO NOTHING
),即 upsert。
Comparison with ON DUPLICATE KEY UPDATE
。
有关用法,请参见 the manual - 特别是语法图中的 conflict_action 子句和 the explanatory text。
与下面给出的 9.4 及更早版本的解决方案不同,此功能适用于多个冲突行,并且不需要排他锁定或重试循环。
The commit adding the feature is here 和 the discussion around its development is here。
如果您使用的是 9.5 并且不需要向后兼容,您现在可以停止阅读。
9.4 及以上版本:
PostgreSQL 没有任何内置的 UPSERT
(或 MERGE
)工具,在并发使用的情况下有效地做到这一点非常困难。
This article discusses the problem in useful detail。
通常,您必须在两个选项之间进行选择:
重试循环中的单独插入/更新操作;或者
锁定表并进行批量合并
单行重试循环
如果您希望多个连接同时尝试执行插入,则在重试循环中使用单独的行 upsert 是合理的选择。
The PostgreSQL documentation contains a useful procedure that'll let you do this in a loop inside the database。与大多数幼稚的解决方案不同,它可以防止丢失更新和插入竞争。它只能在 READ COMMITTED
模式下工作,并且只有当它是您在事务中唯一做的事情时才是安全的。如果触发器或辅助唯一键导致唯一违规,该功能将无法正常工作。
这种策略非常低效。只要可行,您应该排队工作并按照如下所述进行批量更新。
许多尝试解决此问题的方法都没有考虑回滚,因此导致更新不完整。两笔交易相互竞争;其中之一成功INSERT
;另一个得到一个重复的键错误,而是执行 UPDATE
。 UPDATE
块等待 INSERT
回滚或提交。当它回滚时,UPDATE
条件重新检查匹配零行,因此即使 UPDATE
提交它实际上并没有完成您预期的 upsert。您必须检查结果行数并在必要时重试。
一些尝试的解决方案也未能考虑 SELECT 比赛。如果您尝试明显而简单的方法:
-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.
BEGIN;
UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;
-- Remember, this is WRONG. Do NOT COPY IT.
INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);
COMMIT;
然后当两个同时运行时,会出现几种故障模式。一个是已经讨论过的更新重新检查问题。另一个是两个 UPDATE
同时匹配零行并继续。然后他们都进行了 EXISTS
测试,该测试发生在 INSERT
之前。两者都得到零行,所以都做 INSERT
。一个因重复键错误而失败。
这就是您需要重试循环的原因。您可能认为可以使用巧妙的 SQL 防止重复键错误或丢失更新,但事实并非如此。您需要检查行数或处理重复的键错误(取决于选择的方法)并重试。
请不要为此推出自己的解决方案。就像消息队列一样,它可能是错误的。
带锁的批量更新插入
有时你想做一个批量更新,你有一个新的数据集,你想合并到一个旧的现有数据集中。这比单独的行 upserts 效率高得多,并且应该在可行时首选。
在这种情况下,您通常遵循以下过程:
创建一个临时表
将新数据复制或批量插入到临时表中
以独占模式锁定目标表。这允许其他事务选择,但不允许对表进行任何更改。
使用临时表中的值对现有记录执行 UPDATE ... FROM;
插入目标表中尚不存在的行;
COMMIT,释放锁。
例如,对于问题中给出的示例,使用多值 INSERT
填充临时表:
BEGIN;
CREATE TEMPORARY TABLE newvals(id integer, somedata text);
INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');
LOCK TABLE testtable IN EXCLUSIVE MODE;
UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;
INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;
COMMIT;
相关阅读
UPSERT 维基页面
Postgres 中的 UPSERTisms
插入,在 PostgreSQL 中重复更新?
http://petereisentraut.blogspot.com/2010/05/merge-syntax.html
使用事务更新插入
函数中的 SELECT 或 INSERT 是否容易出现竞争条件?
PostgreSQL wiki 上的 SQL MERGE
现在在 Postgresql 中实现 UPSERT 的最惯用方式
合并呢?
SQL 标准的 MERGE
实际上定义不明确的并发语义,不适合在不先锁定表的情况下进行更新插入。
对于数据合并来说,这是一个非常有用的 OLAP 语句,但对于并发安全的 upsert,它实际上并不是一个有用的解决方案。对于使用其他 DBMS 的人使用 MERGE
进行更新插入,有很多建议,但实际上是错误的。
其他数据库:
插入...在 MySQL 中进行重复键更新
来自 MS SQL Server 的 MERGE(但请参阅上文关于 MERGE 问题的信息)
来自 Oracle 的 MERGE(但请参阅上文关于 MERGE 问题的内容)
以下是 insert ... on conflict ...
的一些示例(pg 9.5+):
插入,冲突 - 什么都不做。在冲突时插入虚拟(id,名称,大小)值(1,'new_name',3)什么都不做;`
插入,冲突时 - 进行更新,通过列指定冲突目标。在冲突(id)上插入虚拟(id,name,size)值(1,'new_name',3)do update set name ='new_name',size = 3;
插入,冲突时 - 进行更新,通过约束名称指定冲突目标。插入到 dummy(id, name, size) values(1, 'new_name', 3) on conflict on constraint dummy_pkey do update set name = 'new_name', size = 4;
我正在尝试为 PostgreSQL 9.5 之前版本的单插入问题提供另一种解决方案。这个想法只是尝试首先执行插入,如果记录已经存在,则更新它:
do $$
begin
insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
update testtable set somedata = 'Joe' where id = 2;
end $$;
请注意,只有在没有删除表行的情况下才能应用此解决方案。
我不知道这个解决方案的效率,但在我看来这很合理。
insert on update
Postgres >=9.5 的 SQLAlchemy upsert
由于上面的大篇幅涵盖了 Postgres 版本的许多不同 SQL 方法(不仅仅是问题中的非 9.5),如果您使用的是 Postgres 9.5,我想在 SQLAlchemy 中添加如何做到这一点。除了实现自己的 upsert,您还可以使用 SQLAlchemy 的函数(在 SQLAlchemy 1.1 中添加)。就个人而言,如果可能的话,我会推荐使用这些。不仅因为方便,还因为它允许 PostgreSQL 处理可能发生的任何竞争条件。
我昨天给出的另一个答案的交叉发布(https://stackoverflow.com/a/44395983/2156909)
SQLAlchemy 现在通过两种方法 on_conflict_do_update()
和 on_conflict_do_nothing()
支持 ON CONFLICT
:
从文档中复制:
from sqlalchemy.dialects.postgresql import insert
stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
index_elements=[my_table.c.user_email],
index_where=my_table.c.user_email.like('%@gmail.com'),
set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS
在 Postgresql 9.3 上测试
SERIALIZABLE
隔离,您会因序列化失败而中止,否则您可能会遇到独特的违规行为。不要重新发明 upsert,重新发明将是错误的。使用 INSERT ... ON CONFLICT ...
。如果您的 PostgreSQL 太旧,请更新它。
INSERT ... ON CLONFLICT ...
不适用于批量加载。从您的帖子中,CTE 中的 LOCK TABLE testtable IN EXCLUSIVE MODE;
是一种获得原子事物的解决方法。不 ?
insert ... where not exists ...
或类似操作。
WITH upsert AS ( UPDATE tbl SET foo = 42 RETURNING * ) INSERT INTO tbl(foo) SELECT 42 WHERE NOT EXISTS (SELECT * FROM upsert);
- 这对我有用
由于 this question 已关闭,我将在此处发布您如何使用 SQLAlchemy 进行操作。通过递归,它重试批量插入或更新以对抗 race conditions 和验证错误。
首先是进口
import itertools as it
from functools import partial
from operator import itemgetter
from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts
现在有几个辅助函数
def chunk(content, chunksize=None):
"""Groups data into chunks each with (at most) `chunksize` items.
https://stackoverflow.com/a/22919323/408556
"""
if chunksize:
i = iter(content)
generator = (list(it.islice(i, chunksize)) for _ in it.count())
else:
generator = iter([content])
return it.takewhile(bool, generator)
def gen_resources(records):
"""Yields a dictionary if the record's id already exists, a row object
otherwise.
"""
ids = {item[0] for item in session.query(Posts.id)}
for record in records:
is_row = hasattr(record, 'to_dict')
if is_row and record.id in ids:
# It's a row but the id already exists, so we need to convert it
# to a dict that updates the existing record. Since it is duplicate,
# also yield True
yield record.to_dict(), True
elif is_row:
# It's a row and the id doesn't exist, so no conversion needed.
# Since it's not a duplicate, also yield False
yield record, False
elif record['id'] in ids:
# It's a dict and the id already exists, so no conversion needed.
# Since it is duplicate, also yield True
yield record, True
else:
# It's a dict and the id doesn't exist, so we need to convert it.
# Since it's not a duplicate, also yield False
yield Posts(**record), False
最后是 upsert 函数
def upsert(data, chunksize=None):
for records in chunk(data, chunksize):
resources = gen_resources(records)
sorted_resources = sorted(resources, key=itemgetter(1))
for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
items = [g[0] for g in group]
if dupe:
_upsert = partial(session.bulk_update_mappings, Posts)
else:
_upsert = session.add_all
try:
_upsert(items)
session.commit()
except IntegrityError:
# A record was added or deleted after we checked, so retry
#
# modify accordingly by adding additional exceptions, e.g.,
# except (IntegrityError, ValidationError, ValueError)
db.session.rollback()
upsert(items)
except Exception as e:
# Some other error occurred so reduce chunksize to isolate the
# offending row(s)
db.session.rollback()
num_items = len(items)
if num_items > 1:
upsert(items, num_items // 2)
else:
print('Error adding record {}'.format(items[0]))
这是你如何使用它
>>> data = [
... {'id': 1, 'text': 'updated post1'},
... {'id': 5, 'text': 'updated post5'},
... {'id': 1000, 'text': 'new post1000'}]
...
>>> upsert(data)
与 bulk_save_objects
相比,它的优势在于它可以在插入时处理关系、错误检查等(与 bulk operations 不同)。
SERIALIZABLE
事务并处理序列化失败,但速度很慢。您需要错误处理和重试循环。请参阅我的答案和其中的“相关阅读”部分。