Module nostalgia.sources.apple.icloud
View Source
import just import pandas as pd from nostalgia.ndf import NDF from nostalgia.times import datetime_from_format class ICloud(NDF): @classmethod def load(cls, nrows=None): files = "~/nostalgia_data/input/apple/*/iCloudUsageData Set*.csv" icloud = pd.concat([pd.read_csv(f, skiprows=1, error_bad_lines=False) for f in just.glob(files)]) icloud = icloud.iloc[:icloud.loc[icloud.Date == "Photos: Delete photo/video from iCloud Photo Library"].index.to_list()[0]] icloud["File Capture Date"] = icloud["File Capture Date"].apply(lambda x: datetime_from_format(x, "%Y-%m-%d")) return cls(icloud) if __name__ == "__main__": j = ICloud.load() print(j.as_simple())
Classes
ICloud
class ICloud( data )
View Source
class ICloud(NDF): @classmethod def load(cls, nrows=None): files = "~/nostalgia_data/input/apple/*/iCloudUsageData Set*.csv" icloud = pd.concat([pd.read_csv(f, skiprows=1, error_bad_lines=False) for f in just.glob(files)]) icloud = icloud.iloc[:icloud.loc[icloud.Date == "Photos: Delete photo/video from iCloud Photo Library"].index.to_list()[0]] icloud["File Capture Date"] = icloud["File Capture Date"].apply(lambda x: datetime_from_format(x, "%Y-%m-%d")) return cls(icloud)
Ancestors (in MRO)
- nostalgia.ndf.NDF
- nostalgia.anonymizer.Anonymizer
- nostalgia.data_loading.Loader
Class variables
anonymized
keywords
nlp_columns
nlp_when
selected_columns
vendor
Static methods
anonymize
def anonymize( )
View Source
@classmethod
def anonymize(cls):
global ANONYMIZED
ANONYMIZED = True
class_df_name
def class_df_name( )
View Source
@classmethod
def class_df_name(cls):
name = normalize_name(cls.__name__)
if cls.vendor is not None and not name.startswith(cls.vendor):
name = cls.vendor + "_" + name
return name
df_label
def df_label( )
View Source
@classmethod
def df_label(cls):
return normalize_name(cls.__name__).replace("_", " ").title()
get_normalized_name
def get_normalized_name( )
View Source
@classmethod
def get_normalized_name(cls):
return normalize_name(cls.__name__)
get_schema
def get_schema( *args, **kwargs )
View Source
@classmethod
def get_schema(cls, *args, **kwargs):
sample = cls.load(*args, nrows=5, **kwargs)
return {k: v for k, v in zip(sample.columns, sample.dtypes)}
ingest
def ingest( )
View Source
@classmethod
def ingest(cls):
load_from_download(vendor=cls.vendor, **cls.ingest_settings)
is_anonymized
def is_anonymized( )
View Source
@classmethod
def is_anonymized(cls):
return ANONYMIZED
latest_file_is_historic
def latest_file_is_historic( glob, key_name='', nrows=None, from_cache=True )
Glob is for using a wildcard pattern, and the last created file will be loaded.
See load_data_file_modified_time for further reference.
Returns a pd.DataFrame
View Source
@classmethod
def latest_file_is_historic(cls, glob, key_name="", nrows=None, from_cache=True):
"""
Glob is for using a wildcard pattern, and the last created file will be loaded.
See `load_data_file_modified_time` for further reference.
Returns a pd.DataFrame
"""
recent = max([x for x in just.glob(glob) if "(" not in x], key=os.path.getctime)
return cls.load_data_file_modified_time(recent, key_name, nrows, from_cache)
load
def load( nrows=None )
View Source
@classmethod
def load(cls, nrows=None):
files = "~/nostalgia_data/input/apple/*/iCloudUsageData Set*.csv"
icloud = pd.concat([pd.read_csv(f, skiprows=1, error_bad_lines=False) for f in just.glob(files)])
icloud = icloud.iloc[:icloud.loc[icloud.Date == "Photos: Delete photo/video from iCloud Photo Library"].index.to_list()[0]]
icloud["File Capture Date"] = icloud["File Capture Date"].apply(lambda x: datetime_from_format(x, "%Y-%m-%d"))
return cls(icloud)
load_data_file_modified_time
def load_data_file_modified_time( fname, key_name='', nrows=None, from_cache=True, **kwargs )
It will load from cache if filename is not changed since last run (and there is a cache). If it has changed, it will reprocess and save it in cache (including the modified_time). Handles csv, mbox and json currently. key_name is only for json. nrows is for enabling quickly loading a sample. from_cache=False allows ignoring the cache and reprocessing the file.
Loading the csv, json or mbox file will yield you a DF
IMPORTANT: assumes you implement handle_dataframe_per_file
This is the post-processing required after the file is loaded, for e.g. converting time
dropping and adding columns.
View Source
@classmethod
def load_data_file_modified_time(
cls, fname, key_name="", nrows=None, from_cache=True, **kwargs
):
"""
It will load from cache if filename is not changed since last run (and there is a cache).
If it has changed, it will reprocess and save it in cache (including the modified_time).
Handles csv, mbox and json currently.
key_name is only for json.
nrows is for enabling quickly loading a sample.
from_cache=False allows ignoring the cache and reprocessing the file.
Loading the csv, json or mbox file will yield you a DF
IMPORTANT: assumes you implement `handle_dataframe_per_file`
This is the post-processing required after the file is loaded, for e.g. converting time
dropping and adding columns.
"""
name = fname + "_" + normalize_name(cls.__name__)
modified_time = os.path.getmtime(os.path.expanduser(fname))
last_modified = get_last_mod_time(name)
if modified_time != last_modified or not from_cache:
if fname.endswith(".csv"):
data = pd.read_csv(fname, error_bad_lines=False, nrows=nrows, **kwargs)
elif fname.endswith(".ics"):
from icalevents.icalevents import events
evs = events(file=fname, start=datetime.fromtimestamp(0), end=datetime.now())
data = [
{
"title": ev.summary,
"description": ev.description,
"location": ev.location,
"start": ev.start,
"end": ev.end,
}
for ev in evs
]
data = pd.DataFrame(data)
elif fname.endswith(".mbox"):
import mailbox
m = mailbox.mbox(fname)
data = pd.DataFrame(
[{l: x[l] for l in ["from", "to", "date", "subject"]} for x in m]
)
else:
data = read_array_of_dict_from_json(fname, key_name, nrows, **kwargs)
data = cls.handle_dataframe_per_file(data, fname)
if nrows is None:
save_df(data, name)
save_last_mod_time(modified_time, name)
else:
data = load_df(name, nrows)
if nrows is not None:
data = data.iloc[-nrows:]
return data
load_dataframe_per_json_file
def load_dataframe_per_json_file( glob_pattern, key='', nrows=None )
View Source
@classmethod
def load_dataframe_per_json_file(cls, glob_pattern, key="", nrows=None):
fnames = set(just.glob(glob_pattern))
name = glob_pattern + "_" + normalize_name(cls.__name__)
processed_files = get_processed_files(name)
to_process = fnames.difference(processed_files)
objects = []
if nrows is not None:
if not to_process:
to_process = list(processed_files)[-nrows:]
else:
to_process = list(to_process)[-nrows:]
if to_process:
print("processing {} files".format(len(to_process)))
for fname in to_process:
data = read_array_of_dict_from_json(fname, key, nrows)
data = cls.handle_dataframe_per_file(data, fname)
if data is None:
continue
objects.append(data)
data = pd.concat(objects)
if processed_files and nrows is None:
data = pd.concat((data, load_df(name)))
for x in ["time", "start", "end"]:
if x in data:
data = data.sort_values(x)
break
if nrows is None:
save_df(data, name)
save_processed_files(fnames | processed_files, name)
else:
data = load_df(name)
if nrows is not None:
data = data.iloc[-nrows:]
return data
load_df
def load_df( nrows )
View Source
@classmethod
def load_df(cls, nrows):
return load_df(cls.get_normalized_name(), nrows)
load_image_texts
def load_image_texts( glob_pattern_s, nrows=None )
View Source
@classmethod def load_image_texts(cls, glob_pattern_s, nrows=None): import pytesseract from PIL import Image if isinstance(glob_pattern_s, list): fnames = set() for glob_pattern in glob_pattern_s: fnames.update(set(just.glob(glob_pattern))) glob_pattern = "_".join(glob_pattern_s) else: fnames = set(just.glob(glob_pattern)) name = glob_pattern + "_" + normalize_name(cls.__name__) processed_files = get_processed_files(name) to_process = fnames.difference(processed_files) objects = [] cache = get_cache("tesseract") if nrows is not None: if not to_process: return load_df(name).iloc[-nrows:] else: to_process = list(to_process)[-nrows:] if to_process: for fname in to_process: if fname in cache: text = cache[fname] else: try: text = pytesseract.image_to_string(Image.open(just.make_path(fname))) except OSError as e: print("ERR", fname, e) continue cache[fname] = text time = datetime_from_timestamp(os.path.getmtime(fname), "utc") data = {"text": text, "path": fname, "title": fname.split("/")[-1], "time": time} objects.append(data) data = pd.DataFrame(objects) if processed_files and nrows is None: data = pd.concat((data, load_df(name))) for x in ["time", "start", "end"]: if x in data: data = data.sort_values(x) break if nrows is None: save_df(data, name) save_processed_files(fnames | processed_files, name) else: data = load_df(name) if nrows is not None: data = data.iloc[-nrows:] return data
load_json_file_modified_time
def load_json_file_modified_time( fname, nrows=None, from_cache=True, **kwargs )
View Source
@classmethod
def load_json_file_modified_time(cls, fname, nrows=None, from_cache=True, **kwargs):
name = fname + "_" + normalize_name(cls.__name__)
modified_time = os.path.getmtime(os.path.expanduser(fname))
last_modified = get_last_mod_time(name)
if modified_time != last_modified or not from_cache:
data = just.read(fname)
data = cls.handle_json(data, **kwargs)
data = pd.DataFrame(data)
if nrows is None:
save_df(data, name)
save_last_mod_time(modified_time, name)
else:
data = load_df(name)
if nrows is not None:
data = data.iloc[-nrows:]
return data
load_object_per_newline
def load_object_per_newline( fname, nrows=None )
Iterates over a file containing an object per line (e.g. .jsonl or .txt).
Will only handle new lines not seen earlier; it detects this by storing the number-of-objects seen.
You should implement object_to_row(cls, row) on your class that returns a dictionary.
View Source
@classmethod
def load_object_per_newline(cls, fname, nrows=None):
"""
Iterates over a file containing an object per line (e.g. .jsonl or .txt).
Will only handle new lines not seen earlier; it detects this by storing the number-of-objects seen.
You should implement `object_to_row(cls, row)` on your class that returns a dictionary.
"""
data = []
name = fname + "_" + normalize_name(cls.__name__)
newline_count = get_newline_count(name)
for i, x in enumerate(just.iread(fname)):
if nrows is None:
if i < newline_count:
continue
row = cls.object_to_row(x)
if row is None:
continue
data.append(row)
# breaking at approx 5 rows
if nrows is not None and i > nrows:
break
if data:
data = pd.DataFrame(data)
if newline_count and nrows is None:
data = pd.concat((data, load_df(name)))
if nrows is None:
data = save_df(data, name)
n = i + 1
save_newline_count(n, name)
else:
data = load_df(name)
if nrows is not None:
data = data.iloc[-nrows:]
return data
load_sample_data
def load_sample_data( )
View Source
@classmethod def load_sample_data(cls): nostalgia_dir = os.path.dirname(nostalgia.__file__) fname = os.path.join(nostalgia_dir, "data/samples/" + cls.class_df_name() + ".parquet") if os.path.exists(fname): print("loaded method 1") df = pd.read_parquet(fname) else: import pkgutil from io import BytesIO nostalgia_dir = os.path.dirname(nostalgia.__file__) sample_name = "data/samples/" + cls.class_df_name() + ".parquet" data = pkgutil.get_data("nostalgia", sample_name) print("loaded method 2") df = pd.read_parquet(BytesIO(data)) return cls(df)
register
def register( )
View Source
@classmethod
def register(cls):
return cls.load(nrows=5)
reveal
def reveal( )
View Source
@classmethod
def reveal(cls):
global ANONYMIZED
ANONYMIZED = False
save_df
def save_df( df, name=None )
View Source
@classmethod
def save_df(cls, df, name=None):
return save_df(df, name or cls.get_normalized_name())
Instance variables
at_home
at_work
df_name
duration
during_office_hours
end
in_office_days
in_office_hours
last_day
last_month
last_week
last_year
outside_office_hours
start
text_cols
time
when_asleep
yesterday
Methods
add_heartrate
def add_heartrate( self )
View Source
def add_heartrate(self):
return self.take_from("heartrate", "value")
as_simple
def as_simple( self, max_n=None )
View Source
def as_simple(self, max_n=None):
data = {
"title": self.df_name, # default, to be overwritten
"url": None,
"start": None,
"end": None,
# "body": None,
"type": self.df_name,
"interval": True,
"sender": None,
"value": getattr(self, "value", None),
"index_loc": self.index,
}
for x in ["title", "name", "naam", "subject", "url", "content", "text", "value"]:
res = getattr(self, x, None)
if res is not None:
data["title"] = res
break
res = getattr(self, "sender", None)
if res is not None:
data["sender"] = res
for x in ["url", "path", "file"]:
res = getattr(self, x, None)
if res is not None:
data["url"] = res
break
for x in ["start", "time", "timestamp"]:
res = getattr(self, x, None)
if res is not None:
data["start"] = res
break
for x in ["end"]:
res = getattr(self, x, None)
if res is not None:
data["end"] = res - pd.Timedelta(microseconds=1)
break
if data["end"] is None:
data["end"] = data["start"] + pd.Timedelta(minutes=5)
data["interval"] = False
try:
data = pd.DataFrame(data).sort_values("start")
if max_n is not None:
data = data.iloc[-max_n:]
return data
except ValueError:
raise ValueError(f"No fields are mapped for {self.__class__.__name__}")
at
def at( self, time_or_place )
View Source
def at(self, time_or_place):
if isinstance(time_or_place, NDF) and time_or_place.df_name.endswith("places"):
return self.when_at(time_or_place)
if isinstance(time_or_place, str):
mp = parse_date_tz(time_or_place)
if mp:
start = mp.start_date
end = mp.end_date
return self.at_time(start, end)
else:
return self.when_at(get_type_from_registry("places").containing(time_or_place))
raise ValueError("neither time nor place was passed")
at_day
def at_day( self, day_or_class )
View Source
def at_day(self, day_or_class):
return self[self._select_at_day(day_or_class)]
at_night
def at_night( self, start=22, end=8 )
View Source
def at_night(self, start=22, end=8):
return self.between_hours(start, end)
at_time
def at_time( self, start, end=None, sort_diff=True, **window_kwargs )
View Source
def at_time(self, start, end=None, sort_diff=True, **window_kwargs):
if is_mp(start):
start = start.start_date
end = start.end_date
elif isinstance(start, str) and end is None:
mp = parse_date_tz(start)
start = mp.start_date
end = mp.end_date
elif isinstance(start, str) and isinstance(end, str):
mp = parse_date_tz(start)
start = mp.start_date
mp = parse_date_tz(end)
end = mp.end_date
elif end is None and window_kwargs:
end = start
elif end is None:
raise ValueError(
"Either a metaperiod, a date string, 2 times, or time + window_kwargs."
)
self.infer_time()
if window_kwargs:
start = start - pd.Timedelta(**window_kwargs)
end = end + pd.Timedelta(**window_kwargs)
if self._start_col is None:
res = self[ab_overlap_c(start, end, self[self._time_col])]
else:
res = self[ab_overlap_cd(self[self._start_col], self[self._end_col], start, end)]
if not res.empty and sort_diff:
# avg_time = start + (end - start) / 2
# res["sort_score"] = -abs(res[self._time_col] - avg_time)
# res = res.sort_values('sort_score').drop('sort_score', axis=1)
res["sort_score"] = res[self._time_col]
res = res.sort_values('sort_score').drop('sort_score', axis=1)
return self.__class__(res)
between_hours
def between_hours( self, start=22, end=8 )
View Source
def between_hours(self, start=22, end=8): if self._start_col is not None: return self[(self.start.dt.hour > start) | (self.end.dt.hour < end)] return self[(self.time.dt.hour > start) & (self.time.dt.hour < end)]
browsing
def browsing( self, other, **window_kwargs )
View Source
def browsing(self, other, **window_kwargs):
if isinstance(other, str):
other = get_type_from_registry("browser").containing(other)
return self.__class__(join_time(other, self, **window_kwargs))
by_me
def by_me( self )
View Source
@nlp("filter", "by me", "i", "my")
def by_me(self):
return self
col_contains
def col_contains( self, string, col_name, case=False, regex=False, na=False )
View Source
def col_contains(self, string, col_name, case=False, regex=False, na=False):
return self[self[col_name].str.contains(string, case=case, regex=regex, na=na)]
containing
def containing( self, string, col_name=None, case=False, regex=True, na=False, bound=True )
Filters using string in all string columns when col_name is None, otherwise in just that one
When bound=True it means to add word boundaries to the regex.
case=True is whether to be case-sensitive
regex=True means to treat string as regex
na=False means to consider NaN to be considered False
View Source
def containing(self, string, col_name=None, case=False, regex=True, na=False, bound=True):
"""
Filters using string in all string columns when col_name is None, otherwise in just that one
When `bound=True` it means to add word boundaries to the regex.
case=True is whether to be case-sensitive
regex=True means to treat string as regex
na=False means to consider NaN to be considered False
"""
if regex and bound:
string = r"\b" + string + r"\b"
if col_name is not None:
return self.col_contains(string, col_name, case, regex, na)
bool_cols = [
self[x].str.contains(string, case=case, regex=regex, na=na) for x in self.text_cols
]
bool_array = bool_cols[0]
for b in bool_cols[1:]:
bool_array = np.logical_or(bool_array, b)
return self.__class__(self[bool_array])
count
def count( self )
View Source
@nlp("end", "how many", "how many times", "how often")
def count(self):
return self.shape[0]
create_sample_data
def create_sample_data( self )
View Source
def create_sample_data(self):
nostalgia_dir = os.path.dirname(nostalgia.__file__)
fname = os.path.join(nostalgia_dir, "data/samples/" + self.df_name + ".parquet")
# verify that we can process it
_ = self.as_simple()
sample = self.iloc[:100].reset_index().drop("index", axis=1)
# if self.is_anonymized:
# for x in self.anonymized:
# dtype = self.dtypes[x]
# if str(self.dtypes[x]) == "object":
# sample[x] = x
# else:
# sample[x] = np.random.choice(sample[x], sample.shape[0])
# assert sample[x].dtype == dtype
n = min(sample.shape[0], 5)
if n == 0:
raise ValueError("Empty DataFrame, cannot make sample")
sample = (
sample.sample(n)
.reset_index()
.drop("index", axis=1)
.drop("level_0", axis=1, errors="ignore")
)
sample.to_parquet(fname)
print(f"Sample save as {os.path.abspath(fname)}")
return sample
duration_longer_than
def duration_longer_than( self, **timedelta_kwargs )
View Source
def duration_longer_than(self, **timedelta_kwargs):
return self[(self.end - self.time) >= timedelta(**timedelta_kwargs)]
duration_shorter_than
def duration_shorter_than( self, **timedelta_kwargs )
View Source
def duration_shorter_than(self, **timedelta_kwargs):
return self[(self.end - self.time) <= timedelta(**timedelta_kwargs)]
get_type_from_registry
def get_type_from_registry( self, tp )
View Source
def get_type_from_registry(self, tp):
for key, value in registry.items():
if key.endswith(tp):
return value
head
def head( self, *args, **kwargs )
View Source
def head(self, *args, **kwargs):
return self.__class__(super().head(*args, **kwargs))
heartrate_above
def heartrate_above( self, value )
View Source
def heartrate_above(self, value):
return self.heartrate_range(value)
heartrate_below
def heartrate_below( self, value )
View Source
def heartrate_below(self, value):
return self.heartrate_range(None, value)
heartrate_range
def heartrate_range( self, low, high=None )
View Source
def heartrate_range(self, low, high=None):
if "heartrate_value" not in self.columns:
self.add_heartrate()
if high is not None and low is not None:
return self[(self["heartrate_value"] >= low) & self["heartrate_value"] < high]
if low is not None:
return self[self["heartrate_value"] >= low]
if high is not None:
return self[self["heartrate_value"] < high]
in_a
def in_a( self, s )
View Source
def in_a(self, s):
return self.near(s)
infer_time
def infer_time( self )
View Source
def infer_time(self):
if self.__class__.__name__ == "Results":
self._start_col, self._time_col, self._end_col = "start", "start", "end"
return
times = [x for x, y in zip(self.columns, self.dtypes) if "datetime" in str(y)]
levels = [self.time_level(self[x]) for x in times]
if not levels:
raise ValueError(
f"Either 1 or 2 columns should be of type datetime for {self.__class__.__name__} (0 found)"
)
max_level = max(levels)
# workaround
# start: 10:00:00
# end: 10:00:59
times = [t for t, l in zip(times, levels) if l == max_level or (l == 2 and max_level == 3)]
num_times = len(times)
self.num_times = num_times
if num_times == 0:
self._start_col, self._time_col, self._end_col = None, None, None
elif num_times == 1:
self._start_col, self._time_col, self._end_col = None, times[0], None
elif num_times == 2:
col1, col2 = times
sub = self[self[col1].notnull() & self[col2].notnull()]
a, b = sub[col1], sub[col2]
if (a >= b).all():
col1, col2 = col2, col1
elif not (a <= b).all():
raise ValueError(
"Not strictly one col higher than other with dates, can't determine"
)
if col1 == "end" and col2 == "start":
col2, col1 = col1, col2
self._start_col, self._time_col, self._end_col = col1, col1, col2
interval_index = pd.IntervalIndex.from_arrays(
self[self._start_col], self[self._end_col]
)
self.set_index(interval_index, inplace=True)
self.sort_index(inplace=True)
else:
msg = 'infer time failed: there can only be 1 or 2 datetime columns at the same granularity.'
raise Exception(msg + " Found: " + str(times))
last
def last( self )
View Source
@nlp("filter", "last", "last time", "most recently")
def last(self):
_ = self.time # to get inferred time if not set
col = self._time_col or self._start_col
return self.__class__(self.sort_values(col, na_position="last", ascending=False).iloc[:1])
near
def near( self, s )
View Source
def near(self, s):
if isinstance(s, NDF) and s.df_name.endswith("places"):
selection = s
else:
selection = get_type_from_registry("places").containing(s)
return self.when_at(selection)
not_at_day
def not_at_day( self, day_or_class )
View Source
def not_at_day(self, day_or_class):
return self[~self._select_at_day(day_or_class)]
query
def query( self, expr )
View Source
def query(self, expr):
return self.__class__(super().query(expr))
read
def read( self, index )
View Source
def read(self, index):
return just.read(self.path[index])
show_me
def show_me( self )
View Source
@nlp("end", "show", "show me", "show me the", "show the", "what")
def show_me(self):
_ = self.time # to get inferred time if not set
col = self._time_col or self._start_col
return self.__class__(self.sort_values(col, na_position="last", ascending=False))
sort_values
def sort_values( self, by, axis=0, ascending=True, inplace=False, kind='quicksort', na_position='last' )
View Source
def sort_values(
self, by, axis=0, ascending=True, inplace=False, kind='quicksort', na_position='last'
):
return self.__class__(
pd.DataFrame.sort_values(self, by, axis, ascending, inplace, kind, na_position)
)
tail
def tail( self, *args, **kwargs )
View Source
def tail(self, *args, **kwargs):
return self.__class__(super().tail(*args, **kwargs))
take_from
def take_from( self, registry_ending, col_name )
View Source
def take_from(self, registry_ending, col_name):
for registry_type in registry:
if not registry_type.endswith(registry_ending):
continue
# TODO: loop over columns, so we only do index lookup once
# TODO: do not only try self.time but also self.end
new_name = registry_ending + "_" + col_name
if new_name in self.columns:
return self[new_name]
tp = get_type_from_registry(registry_type)
results = []
if not self.inferred_time:
self.infer_time()
for x in self[self._time_col]:
try:
res = tp.loc[x]
if not isinstance(res, pd.Series):
res = res.iloc[0]
res = res[col_name]
except (KeyError, TypeError):
res = np.nan
results.append(res)
self[new_name] = results
return self[new_name]
time_level
def time_level( self, col )
View Source
def time_level(self, col):
if (col.dt.microsecond != 0).any():
return 4
if (col.dt.second != 0).any():
return 3
if (col.dt.minute != 0).any():
return 2
if (col.dt.hour != 0).any():
return 1
return 0
to_html
def to_html( self )
View Source
def to_html(self):
if self.selected_columns:
data = pd.DataFrame({x: getattr(self, x) for x in self.selected_columns})
return data.to_html()
return super().to_html()
to_place
def to_place( self )
View Source
def to_place(self):
results = []
places = get_type_from_registry("places")
for time in self.time:
try:
results.append(places.iloc[places.index.get_loc(time)].iloc[0])
except (TypeError, KeyError):
pass
return places.__class__(results)
view
def view( self, index )
View Source
def view(self, index):
view(self.path[index])
when
def when( self, other, **window_kwargs )
View Source
def when_at(self, other, **window_kwargs):
if isinstance(other, str):
other = get_type_from_registry("places").containing(other)
return self.__class__(join_time(other, self, **window_kwargs))
when_at
def when_at( self, other, **window_kwargs )
View Source
def when_at(self, other, **window_kwargs):
if isinstance(other, str):
other = get_type_from_registry("places").containing(other)
return self.__class__(join_time(other, self, **window_kwargs))