Skip to content

Commit 30dd2e2

Browse files
committed
Revert "text: remove legacy PluginAsset support (#2417)"
Summary: Reverting because this is a breaking change that should be deferred until TensorBoard 2.0. This reverts commit 2149119. Test Plan: See the commit message of the commit being reverted. Follow that test plan, but verify that all data shows up instead of only the new data. wchargin-branch: reinstate-text-plugin-assets
1 parent b68abe9 commit 30dd2e2

File tree

3 files changed

+236
-10
lines changed

3 files changed

+236
-10
lines changed

tensorboard/plugins/text/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ py_library(
1919
":metadata",
2020
"//tensorboard:plugin_util",
2121
"//tensorboard/backend:http_util",
22+
"//tensorboard/compat:tensorflow",
2223
"//tensorboard/plugins:base_plugin",
24+
"//tensorboard/util:tb_logging",
2325
"//tensorboard/util:tensor_util",
2426
"@org_mozilla_bleach",
2527
"@org_pocoo_werkzeug",

tensorboard/plugins/text/text_plugin.py

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import json
2222
import textwrap
23+
import threading
24+
import time
2325

2426
# pylint: disable=g-bad-import-order
2527
# Necessary for an internal test with special behavior for numpy.
@@ -31,10 +33,14 @@
3133

3234
from tensorboard import plugin_util
3335
from tensorboard.backend import http_util
36+
from tensorboard.compat import tf
3437
from tensorboard.plugins import base_plugin
3538
from tensorboard.plugins.text import metadata
39+
from tensorboard.util import tb_logging
3640
from tensorboard.util import tensor_util
3741

42+
logger = tb_logging.get_logger()
43+
3844
# HTTP routes
3945
TAGS_ROUTE = '/tags'
4046
TEXT_ROUTE = '/text'
@@ -202,6 +208,19 @@ def __init__(self, context):
202208
"""
203209
self._multiplexer = context.multiplexer
204210

211+
# Cache the last result of index_impl() so that methods that depend on it
212+
# can return without blocking (while kicking off a background thread to
213+
# recompute the current index).
214+
self._index_cached = None
215+
216+
# Lock that ensures that only one thread attempts to compute index_impl()
217+
# at a given time, since it's expensive.
218+
self._index_impl_lock = threading.Lock()
219+
220+
# Pointer to the current thread computing index_impl(), if any. This is
221+
# stored on TextPlugin only to facilitate testing.
222+
self._index_impl_thread = None
223+
205224
def is_active(self):
206225
"""Determines whether this plugin is active.
207226
@@ -212,23 +231,104 @@ def is_active(self):
212231
"""
213232
if not self._multiplexer:
214233
return False
215-
return bool(self._multiplexer.PluginRunToTagToContent(metadata.PLUGIN_NAME))
234+
235+
if self._index_cached is not None:
236+
# If we already have computed the index, use it to determine whether
237+
# the plugin should be active, and if so, return immediately.
238+
if any(self._index_cached.values()):
239+
return True
240+
241+
if self._multiplexer.PluginRunToTagToContent(metadata.PLUGIN_NAME):
242+
# Text data is present in the multiplexer. No need to further check for
243+
# data stored via the outdated plugin assets method.
244+
return True
245+
246+
# We haven't conclusively determined if the plugin should be active. Launch
247+
# a thread to compute index_impl() and return False to avoid blocking.
248+
self._maybe_launch_index_impl_thread()
249+
250+
return False
216251

217252
def frontend_metadata(self):
218253
return base_plugin.FrontendMetadata(element_name='tf-text-dashboard')
219254

255+
def _maybe_launch_index_impl_thread(self):
256+
"""Attempts to launch a thread to compute index_impl().
257+
258+
This may not launch a new thread if one is already running to compute
259+
index_impl(); in that case, this function is a no-op.
260+
"""
261+
# Try to acquire the lock for computing index_impl(), without blocking.
262+
if self._index_impl_lock.acquire(False):
263+
# We got the lock. Start the thread, which will unlock the lock when done.
264+
self._index_impl_thread = threading.Thread(
265+
target=self._async_index_impl,
266+
name='TextPluginIndexImplThread')
267+
self._index_impl_thread.start()
268+
269+
def _async_index_impl(self):
270+
"""Computes index_impl() asynchronously on a separate thread."""
271+
start = time.time()
272+
logger.info('TextPlugin computing index_impl() in a new thread')
273+
self._index_cached = self.index_impl()
274+
self._index_impl_thread = None
275+
self._index_impl_lock.release()
276+
elapsed = time.time() - start
277+
logger.info(
278+
'TextPlugin index_impl() thread ending after %0.3f sec', elapsed)
279+
220280
def index_impl(self):
221-
mapping = self._multiplexer.PluginRunToTagToContent(metadata.PLUGIN_NAME)
281+
run_to_series = self._fetch_run_to_series_from_multiplexer()
282+
283+
# A previous system of collecting and serving text summaries involved
284+
# storing the tags of text summaries within tensors.json files. See if we
285+
# are currently using that system. We do not want to drop support for that
286+
# use case.
287+
name = 'tensorboard_text'
288+
run_to_assets = self._multiplexer.PluginAssets(name)
289+
for run, assets in run_to_assets.items():
290+
if run in run_to_series:
291+
# When runs conflict, the summaries created via the new method override.
292+
continue
293+
294+
if 'tensors.json' in assets:
295+
tensors_json = self._multiplexer.RetrievePluginAsset(
296+
run, name, 'tensors.json')
297+
tensors = json.loads(tensors_json)
298+
run_to_series[run] = tensors
299+
else:
300+
# The mapping should contain all runs among its keys.
301+
run_to_series[run] = []
302+
303+
return run_to_series
304+
305+
def _fetch_run_to_series_from_multiplexer(self):
306+
# TensorBoard is obtaining summaries related to the text plugin based on
307+
# SummaryMetadata stored within Value protos.
308+
mapping = self._multiplexer.PluginRunToTagToContent(
309+
metadata.PLUGIN_NAME)
222310
return {
223-
run: list(tag_to_content)
311+
run: list(tag_to_content.keys())
224312
for (run, tag_to_content)
225313
in six.iteritems(mapping)
226314
}
227315

316+
def tags_impl(self):
317+
# Recompute the index on demand whenever tags are requested, but do it
318+
# in a separate thread to avoid blocking.
319+
self._maybe_launch_index_impl_thread()
320+
321+
# Use the cached index if present. If it's not, just return the result based
322+
# on data from the multiplexer, requiring no disk read.
323+
if self._index_cached:
324+
return self._index_cached
325+
else:
326+
return self._fetch_run_to_series_from_multiplexer()
327+
228328
@wrappers.Request.application
229329
def tags_route(self, request):
230-
index = self.index_impl()
231-
return http_util.Respond(request, index, 'application/json')
330+
response = self.tags_impl()
331+
return http_util.Respond(request, response, 'application/json')
232332

233333
def text_impl(self, run, tag):
234334
try:

tensorboard/plugins/text/text_plugin_test.py

Lines changed: 129 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from __future__ import print_function
2121

2222
import collections
23+
import json
2324
import os
2425
import textwrap
2526
import numpy as np
@@ -328,12 +329,41 @@ def test_text_array_to_html(self):
328329
</table>""")
329330
self.assertEqual(convert(d3), d3_expected)
330331

332+
def assertIsActive(self, plugin, expected_finally_is_active):
333+
"""Helper to simulate threading for asserting on is_active()."""
334+
patcher = tf.compat.v1.test.mock.patch('threading.Thread.start', autospec=True)
335+
mock = patcher.start()
336+
self.addCleanup(patcher.stop)
337+
338+
# Initial response from is_active() is always False.
339+
self.assertFalse(plugin.is_active())
340+
thread = plugin._index_impl_thread
341+
mock.assert_called_once_with(thread)
342+
343+
# The thread hasn't run yet, so is_active() should still be False, and we
344+
# should not have tried to launch a second thread.
345+
self.assertFalse(plugin.is_active())
346+
mock.assert_called_once_with(thread)
347+
348+
# Run the thread; it should clean up after itself.
349+
thread.run()
350+
self.assertIsNone(plugin._index_impl_thread)
351+
352+
if expected_finally_is_active:
353+
self.assertTrue(plugin.is_active())
354+
# The call above shouldn't have launched a new thread.
355+
mock.assert_called_once_with(thread)
356+
else:
357+
self.assertFalse(plugin.is_active())
358+
# The call above should have launched a second thread to check again.
359+
self.assertEqual(2, mock.call_count)
360+
331361
def testPluginIsActiveWhenNoRuns(self):
332362
"""The plugin should be inactive when there are no runs."""
333363
multiplexer = event_multiplexer.EventMultiplexer()
334364
context = base_plugin.TBContext(logdir=self.logdir, multiplexer=multiplexer)
335365
plugin = text_plugin.TextPlugin(context)
336-
self.assertFalse(plugin.is_active())
366+
self.assertIsActive(plugin, False)
337367

338368
def testPluginIsActiveWhenTextRuns(self):
339369
"""The plugin should be active when there are runs with text."""
@@ -342,7 +372,15 @@ def testPluginIsActiveWhenTextRuns(self):
342372
plugin = text_plugin.TextPlugin(context)
343373
multiplexer.AddRunsFromDirectory(self.logdir)
344374
multiplexer.Reload()
345-
self.assertTrue(plugin.is_active())
375+
376+
patcher = tf.compat.v1.test.mock.patch('threading.Thread.start', autospec=True)
377+
mock = patcher.start()
378+
self.addCleanup(patcher.stop)
379+
self.assertTrue(plugin.is_active(), True)
380+
381+
# Data is available within the multiplexer. No thread should have started
382+
# for checking plugin assets data.
383+
self.assertFalse(mock.called)
346384

347385
def testPluginIsActiveWhenRunsButNoText(self):
348386
"""The plugin should be inactive when there are runs but none has text."""
@@ -353,13 +391,99 @@ def testPluginIsActiveWhenRunsButNoText(self):
353391
self.generate_testdata(include_text=False, logdir=logdir)
354392
multiplexer.AddRunsFromDirectory(logdir)
355393
multiplexer.Reload()
356-
self.assertFalse(plugin.is_active())
394+
self.assertIsActive(plugin, False)
395+
396+
def testPluginTagsImpl(self):
397+
patcher = tf.compat.v1.test.mock.patch('threading.Thread.start', autospec=True)
398+
mock = patcher.start()
399+
self.addCleanup(patcher.stop)
357400

358-
def testPluginIndexImpl(self):
359-
run_to_tags = self.plugin.index_impl()
401+
# Initially, the thread for checking for plugin assets data has not run.
402+
# Hence, the mapping should only have data from the multiplexer.
403+
run_to_tags = self.plugin.tags_impl()
360404
self.assertItemsEqual(['fry', 'leela'], run_to_tags.keys())
361405
self.assertItemsEqual(['message', 'vector'], run_to_tags['fry'])
362406
self.assertItemsEqual(['message', 'vector'], run_to_tags['leela'])
407+
thread = self.plugin._index_impl_thread
408+
mock.assert_called_once_with(thread)
409+
410+
# The thread hasn't run yet, so no change in response, and we should not
411+
# have tried to launch a second thread.
412+
run_to_tags = self.plugin.tags_impl()
413+
self.assertItemsEqual(['fry', 'leela'], run_to_tags.keys())
414+
self.assertItemsEqual(['message', 'vector'], run_to_tags['fry'])
415+
self.assertItemsEqual(['message', 'vector'], run_to_tags['leela'])
416+
mock.assert_called_once_with(thread)
417+
418+
# Run the thread; it should clean up after itself.
419+
thread.run()
420+
self.assertIsNone(self.plugin._index_impl_thread)
421+
422+
# Expect response to be identical to calling index_impl() directly.
423+
self.assertEqual(self.plugin.index_impl(), self.plugin.tags_impl())
424+
# The call above should have launched a second thread to check again.
425+
self.assertEqual(2, mock.call_count)
426+
427+
428+
class TextPluginBackwardsCompatibilityTest(tf.test.TestCase):
429+
430+
def setUp(self):
431+
self.logdir = self.get_temp_dir()
432+
self.generate_testdata()
433+
multiplexer = event_multiplexer.EventMultiplexer()
434+
multiplexer.AddRunsFromDirectory(self.logdir)
435+
multiplexer.Reload()
436+
context = base_plugin.TBContext(logdir=self.logdir, multiplexer=multiplexer)
437+
self.plugin = text_plugin.TextPlugin(context)
438+
439+
def generate_testdata(self):
440+
tf.compat.v1.reset_default_graph()
441+
sess = tf.compat.v1.Session()
442+
placeholder = tf.constant('I am deprecated.')
443+
444+
# Previously, we had used a means of creating text summaries that used
445+
# plugin assets (which loaded JSON files containing runs and tags). The
446+
# plugin must continue to be able to load summaries of that format, so we
447+
# create a summary using that old plugin asset-based method here.
448+
plugin_asset_summary = tf.compat.v1.summary.tensor_summary('old_plugin_asset_summary',
449+
placeholder)
450+
assets_directory = os.path.join(self.logdir, 'fry', 'plugins',
451+
'tensorboard_text')
452+
# Make the directory of assets if it does not exist.
453+
if not os.path.isdir(assets_directory):
454+
try:
455+
os.makedirs(assets_directory)
456+
except OSError as err:
457+
self.assertFail('Could not make assets directory %r: %r',
458+
assets_directory, err)
459+
json_path = os.path.join(assets_directory, 'tensors.json')
460+
with open(json_path, 'w+') as tensors_json_file:
461+
# Write the op name to a JSON file that the text plugin later uses to
462+
# determine the tag names of tensors to fetch.
463+
tensors_json_file.write(json.dumps([plugin_asset_summary.op.name]))
464+
465+
run_name = 'fry'
466+
subdir = os.path.join(self.logdir, run_name)
467+
with test_util.FileWriterCache.get(subdir) as writer:
468+
writer.add_graph(sess.graph)
469+
470+
summ = sess.run(plugin_asset_summary)
471+
writer.add_summary(summ)
472+
473+
def testIndex(self):
474+
index = self.plugin.index_impl()
475+
self.assertItemsEqual(['fry'], index.keys())
476+
# The summary made via plugin assets (the old method being phased out) is
477+
# only available for run 'fry'.
478+
self.assertItemsEqual(['old_plugin_asset_summary'],
479+
index['fry'])
480+
481+
def testText(self):
482+
fry = self.plugin.text_impl('fry', 'old_plugin_asset_summary')
483+
self.assertEqual(len(fry), 1)
484+
self.assertEqual(fry[0]['step'], 0)
485+
self.assertEqual(fry[0]['text'], u'<p>I am deprecated.</p>')
486+
363487

364488

365489
if __name__ == '__main__':

0 commit comments

Comments
 (0)