@staticmethod
def interpolate(t_i, datum, other_ts, other_datums):
# assumes datums are sorted, and filter is in-place
# logic same for up-sampling and down-sampling
lower_datum = datum
lower_idx = -1
lower = filter(lambda x: other_ts[x] < t_i,
list(xrange(len(other_datums))))
if len(lower) > 0:
lower_idx = lower[-1]
lower_datum = other_datums[lower_idx]
upper_datum = datum
upper_idx = -1
upper = filter(lambda x: other_ts[x] > t_i,
list(xrange(len(other_datums))))
if len(upper) > 0:
upper_idx = upper[0]
upper_datum = other_datums[upper_idx]
t_l = other_ts[lower_idx] if lower_idx >= 0 else t_i
t_u = other_ts[upper_idx] if upper_idx >= 0 else t_i
# implicit linear transition model
ratio = (t_i - t_l) / (t_u - t_l)
return lower_datum + ratio * (upper_datum - lower_datum)
@staticmethod
def blend_diff_sized_samples(
xs_times, xs_list, ys_times, ys_list,
direction = "down", custom_size = None):
assert(len(xs_times) == len(xs_list))
assert(len(ys_times) == len(ys_list))
assert(direction in ["up", "down", "custom"])
if len(xs_list) == len(ys_list):
return xs_times, xs_list, ys_times, ys_list
min_times = xs_times if len(xs_list) < len(xs_list) else ys_times
min_list = xs_list if len(xs_list) < len(ys_list) else ys_list
max_times = ys_times if len(xs_list) < len(xs_list) else xs_times
max_list = ys_list if len(xs_list) < len(ys_list) else xs_list
min_is_x = len(xs_list) < len(ys_list)
if direction == "down":
# downsampled_list is from max_times / max_list
downsampled_list = map(lambda i: Util.interpolate(
min_times[i], min_list[i], max_times, max_list),
list(xrange(len(min_list))))
if min_is_x:
return min_times, min_list, min_times, downsampled_list
else:
return min_times, downsampled_list, min_times, min_list
else:
# upsampled list is from min_times, min_list
upsampled_list = map(lambda i: Util.interpolate(
max_times[i], max_list[i], min_times, min_list),
list(xrange(len(max_list))))
if min_is_x:
return max_times, upsampled_list, max_times, max_list
else:
return max_times, max_list, max_times, upsampled_list
class EventDispatch(object):
def __init__(self,
blackboard = None,
node_name = "",
service_handler = None):
self.dispatch_lock = Lock()
self.lock_registry = {}
self.thread_registry = {}
self.dispatch_switch_lock = Lock()
self.dispatch_switch = True
wrap_instance_method(self, 'dispatch',
call_when_switch_turned_on(
self, "dispatch_switch",
"dispatch_switch_lock"))
# if any event sets the switch off
# no other events are dispatched
# until the switch is cleared
if blackboard is None:
rospy.logwarn(
"EventDispatch::no blackboard => no service")
return
if node_name == "":
rospy.logwarn(
"EventDispatch::no node_name => no service")
return
# given a blackboard with Event definitions
self.blackboard = blackboard
def dispatch(self, event, *args, **kwargs):
self.dispatch_lock.acquire()
if event.get_id() in self.thread_registry:
event.status = DISPATCH_STATUS.DISPATCH_FAILED
self.dispatch_lock.release()
return
self.thread_registry[event.get_id()] = StoppableThread(
target=lambda args = args, kwargs = kwargs:\
event.dispatch(self, *args, **kwargs),
# note that the event is dispatched with a reference
# to the event dispatch, giving access / control
# over other events
callback=lambda event = event, args = args, kwargs = kwargs:\
self.dispatch_finish(event, *args, **kwargs))
self.thread_registry[event.get_id()].start()
self.dispatch_lock.release()
def dispatch_finish(self, event, *args, **kwargs):
self.thread_registry.pop(event.get_id(), None)
event.finish(self, *args, **kwargs)
# ONLY the event defines what is dispatched next
# this includes multiple subsequent concurrent events
class Event(object):
def __init__(self, event_id, *args, **kwargs):
self.event_id = event_id
self.status = DISPATCH_STATUS.UNKNOWN
self.event_dispatch = None
self.dispatching_event = None
wrap_instance_method(self,
"dispatch",
watchdog_try_to_run_and_notify_failure(
"dispatch",
self.dispatch_done,
self.catch_exception))
wrap_instance_method(self,
"finish",
watchdog_try_to_run_and_notify_failure(
"finish",
self.finish_done,
self.catch_exception))
self.finish_switch_lock = Lock()
self.finish_switch = True
wrap_instance_method(self, 'finish',
call_when_switch_turned_on(
self, "finish_switch",
"finish_switch_lock"))
def get_id(self):
return self.__class__.__name__ + "@" + str(self.event_id)
# methods for the child to override
def dispatch(self, event_dispatch, *args, **kwargs):
if self.get_id() not in event_dispatch.lock_registry:
event_dispatch.lock_registry[self.get_id()] = Lock()
self.event_dispatch = event_dispatch
def finish(self, event_dispatch, *args, **kwargs):
raise NotImplementedError
def dispatch_done(self, result_key, result):
raise NotImplementedError
def finish_done(self, result_key, result):
raise NotImplementedError
def catch_exception(self, result_key, result):
raise NotImplementedError
@staticmethod
def deserialize(blackboard, req):
raise NotImplementedError