Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 66 additions & 66 deletions IPython/parallel/client/asyncresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None,
self._targets = targets
self._tracker = tracker
self.owner = owner

self._ready = False
self._outputs_ready = False
self._success = None
Expand Down Expand Up @@ -108,14 +108,14 @@ def get(self, timeout=-1):
def _check_ready(self):
if not self.ready():
raise error.TimeoutError("Result not ready.")

def ready(self):
"""Return whether the call has completed."""
if not self._ready:
self.wait(0)
elif not self._outputs_ready:
self._wait_for_outputs(0)

return self._ready

def wait(self, timeout=-1):
Expand Down Expand Up @@ -148,12 +148,12 @@ def wait(self, timeout=-1):
# cutoff infinite wait at 10s
timeout = 10
self._wait_for_outputs(timeout)

if self.owner:

self._metadata = [self._client.metadata.pop(mid) for mid in self.msg_ids]
[self._client.results.pop(mid) for mid in self.msg_ids]



def successful(self):
Expand All @@ -178,8 +178,8 @@ def get_dict(self, timeout=-1):
if self._single_result:
results = [results]
engine_ids = [ md['engine_id'] for md in self._metadata ]


rdict = {}
for engine_id, result in zip(engine_ids, results):
if engine_id in rdict:
Expand Down Expand Up @@ -281,24 +281,24 @@ def __iter__(self):
# already done
for r in rlist:
yield r

def __len__(self):
return len(self.msg_ids)

#-------------------------------------
# Sugar methods and attributes
#-------------------------------------

def timedelta(self, start, end, start_key=min, end_key=max):
"""compute the difference between two sets of timestamps

The default behavior is to use the earliest of the first
and the latest of the second list, but this can be changed
by passing a different

Parameters
----------

start : one or more datetime objects (e.g. ar.submitted)
end : one or more datetime objects (e.g. ar.received)
start_key : callable
Expand All @@ -307,10 +307,10 @@ def timedelta(self, start, end, start_key=min, end_key=max):
end_key : callable
Function to call on `end` to extract the relevant
entry [default: max]

Returns
-------

dt : float
The time elapsed (in seconds) between the two selected timestamps.
"""
Expand All @@ -323,59 +323,59 @@ def timedelta(self, start, end, start_key=min, end_key=max):
# not a list
end = end_key(end)
return (end - start).total_seconds()

@property
def progress(self):
"""the number of tasks which have been completed at this point.

Fractional progress would be given by 1.0 * ar.progress / len(ar)
"""
self.wait(0)
return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))

@property
def elapsed(self):
"""elapsed time since initial submission"""
if self.ready():
return self.wall_time

now = submitted = datetime.now()
for msg_id in self.msg_ids:
if msg_id in self._client.metadata:
stamp = self._client.metadata[msg_id]['submitted']
if stamp and stamp < submitted:
submitted = stamp
return (now-submitted).total_seconds()

@property
@check_ready
def serial_time(self):
"""serial computation time of a parallel calculation

Computed as the sum of (completed-started) of each task
"""
t = 0
for md in self._metadata:
t += (md['completed'] - md['started']).total_seconds()
return t

@property
@check_ready
def wall_time(self):
"""actual computation time of a parallel calculation

Computed as the time between the latest `received` stamp
and the earliest `submitted`.

Only reliable if Client was spinning/waiting when the task finished, because
the `received` timestamp is created when a result is pulled off of the zmq queue,
which happens as a result of `client.spin()`.

For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.

"""
return self.timedelta(self.submitted, self.received)

def wait_interactive(self, interval=1., timeout=-1):
"""interactive wait, printing progress at regular intervals"""
if timeout is None:
Expand All @@ -389,7 +389,7 @@ def wait_interactive(self, interval=1., timeout=-1):
sys.stdout.flush()
print()
print("done")

def _republish_displaypub(self, content, eid):
"""republish individual displaypub content dicts"""
try:
Expand All @@ -400,48 +400,48 @@ def _republish_displaypub(self, content, eid):
md = content['metadata'] or {}
md['engine'] = eid
ip.display_pub.publish(data=content['data'], metadata=md)

def _display_stream(self, text, prefix='', file=None):
if not text:
# nothing to display
return
if file is None:
file = sys.stdout
end = '' if text.endswith('\n') else '\n'

multiline = text.count('\n') > int(text.endswith('\n'))
if prefix and multiline and not text.startswith('\n'):
prefix = prefix + '\n'
print("%s%s" % (prefix, text), file=file, end=end)


def _display_single_result(self):
self._display_stream(self.stdout)
self._display_stream(self.stderr, file=sys.stderr)

try:
get_ipython()
except NameError:
# displaypub is meaningless outside IPython
return

for output in self.outputs:
self._republish_displaypub(output, self.engine_id)

if self.execute_result is not None:
display(self.get())

def _wait_for_outputs(self, timeout=-1):
"""wait for the 'status=idle' message that indicates we have all outputs
"""
if self._outputs_ready or not self._success:
# don't wait on errors
return

# cast None to -1 for infinite timeout
if timeout is None:
timeout = -1

tic = time.time()
while True:
self._client._flush_iopub(self._client._iopub_socket)
Expand All @@ -451,30 +451,30 @@ def _wait_for_outputs(self, timeout=-1):
(timeout >= 0 and time.time() > tic + timeout):
break
time.sleep(0.01)

@check_ready
def display_outputs(self, groupby="type"):
"""republish the outputs of the computation

Parameters
----------

groupby : str [default: type]
if 'type':
Group outputs by type (show all stdout, then all stderr, etc.):

[stdout:1] foo
[stdout:2] foo
[stderr:1] bar
[stderr:2] bar
if 'engine':
Display outputs for each engine before moving on to the next:

[stdout:1] foo
[stderr:1] bar
[stdout:2] foo
[stderr:2] bar

if 'order':
Like 'type', but further collate individual displaypub
outputs. This is meant for cases of each command producing
Expand All @@ -484,52 +484,52 @@ def display_outputs(self, groupby="type"):
if self._single_result:
self._display_single_result()
return

stdouts = self.stdout
stderrs = self.stderr
execute_results = self.execute_result
output_lists = self.outputs
results = self.get()

targets = self.engine_id

if groupby == "engine":
for eid,stdout,stderr,outputs,r,execute_result in zip(
targets, stdouts, stderrs, output_lists, results, execute_results
):
self._display_stream(stdout, '[stdout:%i] ' % eid)
self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)

try:
get_ipython()
except NameError:
# displaypub is meaningless outside IPython
return
return

if outputs or execute_result is not None:
_raw_text('[output:%i]' % eid)

for output in outputs:
self._republish_displaypub(output, eid)

if execute_result is not None:
display(r)

elif groupby in ('type', 'order'):
# republish stdout:
for eid,stdout in zip(targets, stdouts):
self._display_stream(stdout, '[stdout:%i] ' % eid)

# republish stderr:
for eid,stderr in zip(targets, stderrs):
self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)

try:
get_ipython()
except NameError:
# displaypub is meaningless outside IPython
return

if groupby == 'order':
output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
N = max(len(outputs) for outputs in output_lists)
Expand All @@ -546,28 +546,28 @@ def display_outputs(self, groupby="type"):
_raw_text('[output:%i]' % eid)
for output in outputs:
self._republish_displaypub(output, eid)

# finally, add execute_result:
for eid,r,execute_result in zip(targets, results, execute_results):
if execute_result is not None:
display(r)

else:
raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)




class AsyncMapResult(AsyncResult):
"""Class for representing results of non-blocking gathers.

This will properly reconstruct the gather.

This class is iterable at any time, and will wait on results as they come.

If ordered=False, then the first results to arrive will come first, otherwise
results will be yielded in the order they were submitted.

"""

def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
Expand Down Expand Up @@ -655,7 +655,7 @@ class AsyncHubResult(AsyncResult):
def _wait_for_outputs(self, timeout=-1):
"""no-op, because HubResults are never incomplete"""
self._outputs_ready = True

def wait(self, timeout=-1):
"""wait for result to complete."""
start = time.time()
Expand Down Expand Up @@ -698,6 +698,6 @@ def wait(self, timeout=-1):
if self.owner:
[self._client.metadata.pop(mid) for mid in self.msg_ids]
[self._client.results.pop(mid) for mid in self.msg_ids]


__all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
Loading