Skip to content

Commit 34f4ec9

Browse files
committed
[IMP] snippets: move all work from parent to mp workers
In `convert_html_columns()`, we select 100MiB worth of DB tuples and pass them to a ProcessPoolExecutor together with a converter callable. So far, the converter returns all tuples, changed or unchanged together with the information if it has changed something. All this is returned through IPC to the parent process. In the parent process, the caller only acts on the changed tuples, though, the rest is ignored. In any scenario I've seen, only a small proportion of the input tuples is actually changed, meaning that a large proportion is returned through IPC unnecessarily. What makes it worse is that processing of the converted results in the parent process is often slower than the conversion, leading to two effects: 1) The results of all workers sit in the parent process's memory, possibly leading to MemoryError (upg-2021031) 2) The parallel processing is being serialized on the feedback, defeating a large part of the intended performance gains To improve this, this commit - moves all work into the workers, meaning not just the conversion filter, but also the DB query as well as the DB updates. - by doing so reduces the amount of data passed by IPC to just the query texts - by doing so distributes the data held in memory to all worker processes - reduces the chunk size by one order of magnitude, which means - a lot less memory used at a time - a lot better distribution of "to-be-changed" rows when these rows are clustered in the table All in all, in my test case, this - reduces maximum process size in memory to 300MiB for all processes compared to formerly >2GiB (and MemoryError) in the parent process - reduces runtime from 17 minutes to less than 2 minutes
1 parent daa5888 commit 34f4ec9

File tree

2 files changed

+59
-30
lines changed

2 files changed

+59
-30
lines changed

src/base/tests/test_util.py

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
except ImportError:
1616
import mock
1717

18-
from odoo import modules
18+
from odoo import SUPERUSER_ID, api, modules
1919
from odoo.tools import mute_logger
2020

2121
from odoo.addons.base.maintenance.migrations import util
@@ -1965,24 +1965,27 @@ def testsnip(self):
19651965
</script>
19661966
</html>
19671967
"""
1968-
view_id = self.env["ir.ui.view"].create(
1969-
{
1970-
"name": "not_for_anything",
1971-
"type": "qweb",
1972-
"mode": "primary",
1973-
"key": "test.htmlconvert",
1974-
"arch_db": view_arch,
1975-
}
1976-
)
1977-
cr = self.env.cr
1978-
snippets.convert_html_content(
1979-
cr,
1980-
snippets.html_converter(
1981-
not_doing_anything_converter, selector="//*[hasclass('fake_class_not_doing_anything')]"
1982-
),
1983-
)
1984-
util.invalidate(view_id)
1985-
res = self.env["ir.ui.view"].search_read([("id", "=", view_id.id)], ["arch_db"])
1968+
vals = {
1969+
"name": "not_for_anything",
1970+
"type": "qweb",
1971+
"mode": "primary",
1972+
"key": "test.htmlconvert",
1973+
"arch_db": view_arch,
1974+
}
1975+
# util.convert_html_columns() commits the cursor, use a new transaction to not mess up the test_cr
1976+
with self.registry.cursor() as cr:
1977+
env = api.Environment(cr, SUPERUSER_ID, {})
1978+
view_id = env["ir.ui.view"].create(vals)
1979+
snippets.convert_html_content(
1980+
cr,
1981+
snippets.html_converter(
1982+
not_doing_anything_converter, selector="//*[hasclass('fake_class_not_doing_anything')]"
1983+
),
1984+
)
1985+
util.invalidate(view_id)
1986+
res = env["ir.ui.view"].search_read([("id", "=", view_id.id)], ["arch_db"])
1987+
# clean up committed data
1988+
view_id.unlink()
19861989
self.assertEqual(len(res), 1)
19871990
oneline = lambda s: re.sub(r"\s+", " ", s.strip())
19881991
self.assertEqual(oneline(res[0]["arch_db"]), oneline(view_arch))

src/util/snippets.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# -*- coding: utf-8 -*-
2+
import concurrent
3+
import contextlib
24
import inspect
35
import logging
46
import re
@@ -11,6 +13,9 @@
1113
from psycopg2.extensions import quote_ident
1214
from psycopg2.extras import Json
1315

16+
with contextlib.suppress(ImportError):
17+
from odoo.sql_db import db_connect
18+
1419
from .const import NEARLYWARN
1520
from .exceptions import MigrationError
1621
from .helpers import table_of_model
@@ -243,11 +248,24 @@ def _dumps(self, node):
243248

244249

245250
class Convertor:
246-
def __init__(self, converters, callback):
251+
def __init__(self, converters, callback, dbname=None, update_query=None):
247252
self.converters = converters
248253
self.callback = callback
249-
250-
def __call__(self, row):
254+
self.dbname = dbname
255+
self.update_query = update_query
256+
257+
def __call__(self, row_or_query):
258+
# backwards compatibility: caller passes rows and expects us to return them converted
259+
if not self.dbname:
260+
return self._convert_row(row_or_query)
261+
# improved interface: caller passes a query for us to fetch input rows, convert and update them
262+
with db_connect(self.dbname).cursor() as cr:
263+
cr.execute(row_or_query)
264+
for changes in filter(None, map(self._convert_row, cr.fetchall())):
265+
cr.execute(self.update_query, changes)
266+
return None
267+
268+
def _convert_row(self, row):
251269
converters = self.converters
252270
columns = self.converters.keys()
253271
converter_callback = self.callback
@@ -267,7 +285,7 @@ def __call__(self, row):
267285
changes[column] = new_content
268286
if has_changed:
269287
changes["id"] = res_id
270-
return changes
288+
return changes if "id" in changes else None
271289

272290

273291
def convert_html_columns(cr, table, columns, converter_callback, where_column="IS NOT NULL", extra_where="true"):
@@ -305,17 +323,25 @@ def convert_html_columns(cr, table, columns, converter_callback, where_column="I
305323
update_sql = ", ".join(f'"{column}" = %({column})s' for column in columns)
306324
update_query = f"UPDATE {table} SET {update_sql} WHERE id = %(id)s"
307325

326+
cr.commit()
308327
with ProcessPoolExecutor(max_workers=get_max_workers()) as executor:
309-
convert = Convertor(converters, converter_callback)
310-
for query in log_progress(split_queries, logger=_logger, qualifier=f"{table} updates"):
311-
cr.execute(query)
312-
for data in executor.map(convert, cr.fetchall(), chunksize=1000):
313-
if "id" in data:
314-
cr.execute(update_query, data)
328+
convert = Convertor(converters, converter_callback, cr.dbname, update_query)
329+
futures = [executor.submit(convert, query) for query in split_queries]
330+
for future in log_progress(
331+
concurrent.futures.as_completed(futures),
332+
logger=_logger,
333+
qualifier=f"{table} updates",
334+
size=len(split_queries),
335+
estimate=False,
336+
log_hundred_percent=True,
337+
):
338+
# just for raising any worker exception
339+
future.result()
340+
cr.commit()
315341

316342

317343
def determine_chunk_limit_ids(cr, table, column_arr, where):
318-
bytes_per_chunk = 100 * 1024 * 1024
344+
bytes_per_chunk = 10 * 1024 * 1024
319345
columns = ", ".join(quote_ident(column, cr._cnx) for column in column_arr if column != "id")
320346
cr.execute(
321347
f"""

0 commit comments

Comments
 (0)