Skip to content

[IMP] snippets: move all work from parent to mp workers #137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions src/base/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
except ImportError:
import mock

from odoo import modules
from odoo import SUPERUSER_ID, api, modules
from odoo.tools import mute_logger

from odoo.addons.base.maintenance.migrations import util
Expand Down Expand Up @@ -1965,24 +1965,27 @@ def testsnip(self):
</script>
</html>
"""
view_id = self.env["ir.ui.view"].create(
{
"name": "not_for_anything",
"type": "qweb",
"mode": "primary",
"key": "test.htmlconvert",
"arch_db": view_arch,
}
)
cr = self.env.cr
snippets.convert_html_content(
cr,
snippets.html_converter(
not_doing_anything_converter, selector="//*[hasclass('fake_class_not_doing_anything')]"
),
)
util.invalidate(view_id)
res = self.env["ir.ui.view"].search_read([("id", "=", view_id.id)], ["arch_db"])
vals = {
"name": "not_for_anything",
"type": "qweb",
"mode": "primary",
"key": "test.htmlconvert",
"arch_db": view_arch,
}
# util.convert_html_columns() commits the cursor, use a new transaction to not mess up the test_cr
with self.registry.cursor() as cr:
env = api.Environment(cr, SUPERUSER_ID, {})
view_id = env["ir.ui.view"].create(vals)
snippets.convert_html_content(
cr,
snippets.html_converter(
not_doing_anything_converter, selector="//*[hasclass('fake_class_not_doing_anything')]"
),
)
util.invalidate(view_id)
res = env["ir.ui.view"].search_read([("id", "=", view_id.id)], ["arch_db"])
# clean up committed data
view_id.unlink()
self.assertEqual(len(res), 1)
oneline = lambda s: re.sub(r"\s+", " ", s.strip())
self.assertEqual(oneline(res[0]["arch_db"]), oneline(view_arch))
Expand Down
48 changes: 37 additions & 11 deletions src/util/snippets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# -*- coding: utf-8 -*-
import concurrent
import contextlib
import inspect
import logging
import re
Expand All @@ -11,6 +13,9 @@
from psycopg2.extensions import quote_ident
from psycopg2.extras import Json

with contextlib.suppress(ImportError):
from odoo.sql_db import db_connect

from .const import NEARLYWARN
from .exceptions import MigrationError
from .helpers import table_of_model
Expand Down Expand Up @@ -243,11 +248,24 @@ def _dumps(self, node):


class Convertor:
def __init__(self, converters, callback):
def __init__(self, converters, callback, dbname=None, update_query=None):
self.converters = converters
self.callback = callback

def __call__(self, row):
self.dbname = dbname
self.update_query = update_query

def __call__(self, row_or_query):
# backwards compatibility: caller passes rows and expects us to return them converted
if not self.dbname:
return self._convert_row(row_or_query)
# improved interface: caller passes a query for us to fetch input rows, convert and update them
with db_connect(self.dbname).cursor() as cr:
cr.execute(row_or_query)
for changes in filter(None, map(self._convert_row, cr.fetchall())):
cr.execute(self.update_query, changes)
return None

def _convert_row(self, row):
converters = self.converters
columns = self.converters.keys()
converter_callback = self.callback
Expand All @@ -267,7 +285,7 @@ def __call__(self, row):
changes[column] = new_content
if has_changed:
changes["id"] = res_id
return changes
return changes if "id" in changes else None


def convert_html_columns(cr, table, columns, converter_callback, where_column="IS NOT NULL", extra_where="true"):
Expand Down Expand Up @@ -305,17 +323,25 @@ def convert_html_columns(cr, table, columns, converter_callback, where_column="I
update_sql = ", ".join(f'"{column}" = %({column})s' for column in columns)
update_query = f"UPDATE {table} SET {update_sql} WHERE id = %(id)s"

cr.commit()
with ProcessPoolExecutor(max_workers=get_max_workers()) as executor:
convert = Convertor(converters, converter_callback)
for query in log_progress(split_queries, logger=_logger, qualifier=f"{table} updates"):
cr.execute(query)
for data in executor.map(convert, cr.fetchall(), chunksize=1000):
if "id" in data:
cr.execute(update_query, data)
convert = Convertor(converters, converter_callback, cr.dbname, update_query)
futures = [executor.submit(convert, query) for query in split_queries]
for future in log_progress(
concurrent.futures.as_completed(futures),
logger=_logger,
qualifier=f"{table} updates",
size=len(split_queries),
estimate=False,
log_hundred_percent=True,
):
# just for raising any worker exception
future.result()
cr.commit()


def determine_chunk_limit_ids(cr, table, column_arr, where):
bytes_per_chunk = 100 * 1024 * 1024
bytes_per_chunk = 10 * 1024 * 1024
columns = ", ".join(quote_ident(column, cr._cnx) for column in column_arr if column != "id")
cr.execute(
f"""
Expand Down