Skip to content

Instantly share code, notes, and snippets.

@dplepage
Last active December 25, 2015 21:09

Revisions

  1. dplepage renamed this gist Oct 18, 2013. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. dplepage created this gist Oct 18, 2013.
    83 changes: 83 additions & 0 deletions test.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,83 @@
    import datetime as dt

    from dfiance import SchemaObj, Int, String, DateTime

    from timeline import Timeline
    from timeline_dfier import TimelineDfier

    def _format(t):
    yield "Initial: {}".format(t.initial)
    for ts, val in t.events:
    yield '{}: {}'.format(ts, val)

    def format(t):
    return '\n'.join(_format(t))

    class Point(SchemaObj):
    field_types = dict(x=Int(), y=Int(), label=String())
    def __str__(self):
    return "{}@{},{}".format(self.label, self.x, self.y)

    t = Timeline(Point(x=1, y=1, label="WP"))
    t.insert(Point(x=1, y=2, label="WP"))
    then = dt.datetime.now()
    t.insert(Point(x=2, y=4, label="WQ"))
    t.insert(Point(x=1, y=3, label="WP"), timestamp=then)
    assert t.current() == Point(x=2, y=4, label="WQ"), t.current()
    assert t.val_at_time(then) == Point(x=1, y=3, label="WP")

    dfier = TimelineDfier(Point.dfier())
    d = dfier.dictify(t)
    times = [DateTime().dictify(e.timestamp) for e in t.events]
    assert d == {
    'initial': {'y': 1, 'x': 1, 'label': 'WP'},
    'events': [
    (times[0], {'y': 2, 'x': 1, 'label': 'WP'}),
    (times[1], {'y': 3, 'x': 1, 'label': 'WP'}),
    (times[2], {'y': 4, 'x': 2, 'label': 'WQ'})
    ]
    }
    t2 = dfier.undictify(d)
    assert format(t) == format(t2)



    # Example with a custom clock that expresses time as an integer

    class TurnClock(object):
    def __init__(self, turn=0):
    self.turn = turn

    def now(self):
    return self.turn

    def advance(self, t=1):
    self.turn += t

    def set(self, t):
    self.turn = t

    clock = TurnClock(0)
    t = Timeline(Point(x=1, y=1, label="WP"), clock=clock)
    clock.advance()
    t.insert(Point(x=1, y=2, label="WP"))
    clock.advance()
    then = clock.now()
    clock.advance()
    t.insert(Point(x=2, y=4, label="WQ"))
    t.insert(Point(x=1, y=3, label="WP"), timestamp=then)
    assert t.current() == Point(x=2, y=4, label="WQ"), t.current()
    assert t.val_at_time(then) == Point(x=1, y=3, label="WP")

    dfier = TimelineDfier(Point.dfier(), Int())
    d = dfier.dictify(t)
    assert d == {
    'initial': {'y': 1, 'x': 1, 'label': 'WP'},
    'events': [
    (1, {'y': 2, 'x': 1, 'label': 'WP'}),
    (2, {'y': 3, 'x': 1, 'label': 'WP'}),
    (3, {'y': 4, 'x': 2, 'label': 'WQ'})
    ]
    }
    t2 = dfier.undictify(d)
    assert format(t) == format(t2)
    37 changes: 37 additions & 0 deletions timeline.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,37 @@
    import bisect
    from collections import namedtuple
    import datetime as dt

    Event = namedtuple("Event", ('timestamp', 'value'))

    class Timeline(object):
    def __init__(self, initial=None, events=(), clock=None):
    if clock is None:
    clock = dt.datetime
    self.initial = initial
    self.events = map(Event._make, events)
    self.clock = clock

    def set_latest(self, value):
    if self.events:
    self.events[-1] = self.events[-1].replace(value=value)
    else:
    self.initial = value

    def insert(self, value, timestamp=None):
    if timestamp is None:
    timestamp = self.clock.now()
    bisect.insort(self.events, Event(timestamp, value))

    def val_at_time(self, timestamp=None):
    if timestamp is None:
    timestamp = self.clock.now()
    if not self.events or timestamp < self.events[0].timestamp:
    return self.initial
    x = bisect.bisect(self.events, Event(timestamp, None))
    if x < len(self.events) and self.events[x].timestamp == timestamp:
    return self.events[x].value
    return self.events[x-1].value

    def current(self):
    return self.val_at_time(None)
    39 changes: 39 additions & 0 deletions timeline_dfier.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,39 @@
    import datetime as dt

    from dfiance import Dictifier, Field, Invalid, ErrorAggregator, DateTime
    from dfiance import NamedTuple, List

    from timeline import Timeline, Event

    class TimelineDfier(Dictifier):
    def __init__(self, subtype, timetype=None):
    self.val_dfier = Field.asfield(subtype)
    if timetype:
    self.time_dfier = Field.asfield(timetype, not_none=True)
    else:
    self.time_dfier = Field(DateTime(), not_none=True)
    self.event_dfier = NamedTuple(Event(self.time_dfier, self.val_dfier))
    self.list_dfier = List(elt_type=self.event_dfier)

    def dictify(self, value, **kwargs):
    return dict(
    initial = self.val_dfier.dictify(value.initial, **kwargs),
    events = self.list_dfier.dictify(value.events, **kwargs),
    )

    def undictify(self, value, **kwargs):
    if not isinstance(value, dict):
    raise Invalid('not_dict')
    error_agg = ErrorAggregator(autoraise = kwargs.get('fail_early', False))
    with error_agg.checking_sub('initial'):
    initial = self.val_dfier.undictify(value.get('initial'), **kwargs)
    with error_agg.checking_sub('events'):
    events = self.list_dfier.undictify(value.get('events'), **kwargs)
    clock = kwargs.get('timeline_clock', dt.datetime)
    return Timeline(initial, events, clock)

    def validate(self, value, **kwargs):
    if not isinstance(value, TimelineDfier):
    raise Invalid('type_error')
    self.val_dfier.validate(value.initial)
    self.list_dfier.validate(value.events)