Module nostalgia.sources.google.gmail
View Source
import just import pandas as pd from datetime import datetime from nostalgia.times import tz, parse from nostalgia.data_loading import read_array_of_dict_from_json from nostalgia.sources.google import Google def try_parse(x): try: if x.endswith("PST"): x = x.replace("PST", "-0800 (PST)") elif x.endswith("PDT"): x = x.replace("PDT", "-0700 (PDT)") d = parse(x) if d.tzinfo is None: d = d.replace(tzinfo=tz) return d except: return datetime(1970, 1, 1, 0, 0, 0, tzinfo=tz) # class MBox: # ingest_settings = { # "ingest_glob": "~/Downloads/*.mbox", # "recent_only": False, # "delete_existing": False, # } class Gmail(Google): me = [] @classmethod def handle_dataframe_per_file(cls, data, fname): data["subject"] = data["subject"].astype(str) data["to"] = data["to"].astype(str) data["sender"] = data["from"].str.extract("<([^>]+)>") data.loc[data["sender"].isnull(), "sender"] = data[data["sender"].isnull()][ "from" ].str.strip('"') data["sent"] = data.sender.str.contains("|".join(cls.me), na=False) data["receiver"] = data["to"].str.extract("<([^>]+)>") data.loc[data["receiver"].isnull(), "receiver"] = data.loc[data["receiver"].isnull(), "to"] data["received"] = data.receiver.str.contains("|".join(cls.me), na=False) data["timestamp"] = pd.to_datetime([try_parse(x) for x in data.date], utc=True).tz_convert( tz ) data.drop("date", axis=1, inplace=True) return data @classmethod def load(cls, nrows=None, from_cache=True, **kwargs): dfs = [ cls.load_data_file_modified_time(file_path, nrows=nrows, from_cache=from_cache) for file_path in just.glob("~/nostalgia_data/input/google/Takeout/Mail/*.mbox") ] dfs = [x for x in dfs if not x.empty] return cls(pd.concat(dfs)) def sent_by(self, name=None, email=None, case=False): if name is not None and email is not None: a = self.sender.str.contains(name, case=case, na=False) b = self.sender.str.contains(email, case=case, na=False) res = self[a | b] elif name is not None: res = self[self.sender.str.contains(name, case=case, na=False)] elif email is not None: res = self[self.sender.str.contains(email, case=case, na=False)] return self.__class__(res) def received_by(self, name=None, email=None, case=False): if name is not None and email is not None: a = self.receiver.str.contains(name, case=case, na=False) b = self.receiver.str.contains(email, case=case, na=False) res = self[a | b] elif name is not None: res = self[self.receiver.str.contains(name, case=case, na=False)] elif email is not None: res = self[self.receiver.str.contains(email, case=case, na=False)] return self.__class__(res)
Functions
try_parse
def try_parse( x )
View Source
def try_parse(x): try: if x.endswith("PST"): x = x.replace("PST", "-0800 (PST)") elif x.endswith("PDT"): x = x.replace("PDT", "-0700 (PDT)") d = parse(x) if d.tzinfo is None: d = d.replace(tzinfo=tz) return d except: return datetime(1970, 1, 1, 0, 0, 0, tzinfo=tz)
Classes
Gmail
class Gmail( data )
View Source
class Gmail(Google): me = [] @classmethod def handle_dataframe_per_file(cls, data, fname): data["subject"] = data["subject"].astype(str) data["to"] = data["to"].astype(str) data["sender"] = data["from"].str.extract("<([^>]+)>") data.loc[data["sender"].isnull(), "sender"] = data[data["sender"].isnull()][ "from" ].str.strip('"') data["sent"] = data.sender.str.contains("|".join(cls.me), na=False) data["receiver"] = data["to"].str.extract("<([^>]+)>") data.loc[data["receiver"].isnull(), "receiver"] = data.loc[data["receiver"].isnull(), "to"] data["received"] = data.receiver.str.contains("|".join(cls.me), na=False) data["timestamp"] = pd.to_datetime([try_parse(x) for x in data.date], utc=True).tz_convert( tz ) data.drop("date", axis=1, inplace=True) return data @classmethod def load(cls, nrows=None, from_cache=True, **kwargs): dfs = [ cls.load_data_file_modified_time(file_path, nrows=nrows, from_cache=from_cache) for file_path in just.glob("~/nostalgia_data/input/google/Takeout/Mail/*.mbox") ] dfs = [x for x in dfs if not x.empty] return cls(pd.concat(dfs)) def sent_by(self, name=None, email=None, case=False): if name is not None and email is not None: a = self.sender.str.contains(name, case=case, na=False) b = self.sender.str.contains(email, case=case, na=False) res = self[a | b] elif name is not None: res = self[self.sender.str.contains(name, case=case, na=False)] elif email is not None: res = self[self.sender.str.contains(email, case=case, na=False)] return self.__class__(res) def received_by(self, name=None, email=None, case=False): if name is not None and email is not None: a = self.receiver.str.contains(name, case=case, na=False) b = self.receiver.str.contains(email, case=case, na=False) res = self[a | b] elif name is not None: res = self[self.receiver.str.contains(name, case=case, na=False)] elif email is not None: res = self[self.receiver.str.contains(email, case=case, na=False)] return self.__class__(res)
Ancestors (in MRO)
- nostalgia.sources.google.Google
- nostalgia.ndf.NDF
- nostalgia.anonymizer.Anonymizer
- nostalgia.data_loading.Loader
Class variables
anonymized
ingest_settings
keywords
me
nlp_columns
nlp_when
selected_columns
vendor
Static methods
anonymize
def anonymize( )
View Source
@classmethod def anonymize(cls): global ANONYMIZED ANONYMIZED = True
class_df_name
def class_df_name( )
View Source
@classmethod def class_df_name(cls): name = normalize_name(cls.__name__) if cls.vendor is not None and not name.startswith(cls.vendor): name = cls.vendor + "_" + name return name
df_label
def df_label( )
View Source
@classmethod def df_label(cls): return normalize_name(cls.__name__).replace("_", " ").title()
get_normalized_name
def get_normalized_name( )
View Source
@classmethod def get_normalized_name(cls): return normalize_name(cls.__name__)
get_schema
def get_schema( *args, **kwargs )
View Source
@classmethod def get_schema(cls, *args, **kwargs): sample = cls.load(*args, nrows=5, **kwargs) return {k: v for k, v in zip(sample.columns, sample.dtypes)}
handle_dataframe_per_file
def handle_dataframe_per_file( data, fname )
View Source
@classmethod def handle_dataframe_per_file(cls, data, fname): data["subject"] = data["subject"].astype(str) data["to"] = data["to"].astype(str) data["sender"] = data["from"].str.extract("<([^>]+)>") data.loc[data["sender"].isnull(), "sender"] = data[data["sender"].isnull()][ "from" ].str.strip('"') data["sent"] = data.sender.str.contains("|".join(cls.me), na=False) data["receiver"] = data["to"].str.extract("<([^>]+)>") data.loc[data["receiver"].isnull(), "receiver"] = data.loc[data["receiver"].isnull(), "to"] data["received"] = data.receiver.str.contains("|".join(cls.me), na=False) data["timestamp"] = pd.to_datetime([try_parse(x) for x in data.date], utc=True).tz_convert( tz ) data.drop("date", axis=1, inplace=True) return data
ingest
def ingest( )
View Source
@classmethod def ingest(cls): load_from_download(vendor=cls.vendor, **cls.ingest_settings)
is_anonymized
def is_anonymized( )
View Source
@classmethod def is_anonymized(cls): return ANONYMIZED
latest_file_is_historic
def latest_file_is_historic( glob, key_name='', nrows=None, from_cache=True )
Glob is for using a wildcard pattern, and the last created file will be loaded.
See load_data_file_modified_time
for further reference.
Returns a pd.DataFrame
View Source
@classmethod def latest_file_is_historic(cls, glob, key_name="", nrows=None, from_cache=True): """ Glob is for using a wildcard pattern, and the last created file will be loaded. See `load_data_file_modified_time` for further reference. Returns a pd.DataFrame """ recent = max([x for x in just.glob(glob) if "(" not in x], key=os.path.getctime) return cls.load_data_file_modified_time(recent, key_name, nrows, from_cache)
load
def load( nrows=None, from_cache=True, **kwargs )
View Source
@classmethod def load(cls, nrows=None, from_cache=True, **kwargs): dfs = [ cls.load_data_file_modified_time(file_path, nrows=nrows, from_cache=from_cache) for file_path in just.glob("~/nostalgia_data/input/google/Takeout/Mail/*.mbox") ] dfs = [x for x in dfs if not x.empty] return cls(pd.concat(dfs))
load_data_file_modified_time
def load_data_file_modified_time( fname, key_name='', nrows=None, from_cache=True, **kwargs )
It will load from cache if filename is not changed since last run (and there is a cache). If it has changed, it will reprocess and save it in cache (including the modified_time). Handles csv, mbox and json currently. key_name is only for json. nrows is for enabling quickly loading a sample. from_cache=False allows ignoring the cache and reprocessing the file.
Loading the csv, json or mbox file will yield you a DF
IMPORTANT: assumes you implement handle_dataframe_per_file
This is the post-processing required after the file is loaded, for e.g. converting time
dropping and adding columns.
View Source
@classmethod def load_data_file_modified_time( cls, fname, key_name="", nrows=None, from_cache=True, **kwargs ): """ It will load from cache if filename is not changed since last run (and there is a cache). If it has changed, it will reprocess and save it in cache (including the modified_time). Handles csv, mbox and json currently. key_name is only for json. nrows is for enabling quickly loading a sample. from_cache=False allows ignoring the cache and reprocessing the file. Loading the csv, json or mbox file will yield you a DF IMPORTANT: assumes you implement `handle_dataframe_per_file` This is the post-processing required after the file is loaded, for e.g. converting time dropping and adding columns. """ name = fname + "_" + normalize_name(cls.__name__) modified_time = os.path.getmtime(os.path.expanduser(fname)) last_modified = get_last_mod_time(name) if modified_time != last_modified or not from_cache: if fname.endswith(".csv"): data = pd.read_csv(fname, error_bad_lines=False, nrows=nrows, **kwargs) elif fname.endswith(".ics"): from icalevents.icalevents import events evs = events(file=fname, start=datetime.fromtimestamp(0), end=datetime.now()) data = [ { "title": ev.summary, "description": ev.description, "location": ev.location, "start": ev.start, "end": ev.end, } for ev in evs ] data = pd.DataFrame(data) elif fname.endswith(".mbox"): import mailbox m = mailbox.mbox(fname) data = pd.DataFrame( [{l: x[l] for l in ["from", "to", "date", "subject"]} for x in m] ) else: data = read_array_of_dict_from_json(fname, key_name, nrows, **kwargs) data = cls.handle_dataframe_per_file(data, fname) if nrows is None: save_df(data, name) save_last_mod_time(modified_time, name) else: data = load_df(name, nrows) if nrows is not None: data = data.iloc[-nrows:] return data
load_dataframe_per_json_file
def load_dataframe_per_json_file( glob_pattern, key='', nrows=None )
View Source
@classmethod def load_dataframe_per_json_file(cls, glob_pattern, key="", nrows=None): fnames = set(just.glob(glob_pattern)) name = glob_pattern + "_" + normalize_name(cls.__name__) processed_files = get_processed_files(name) to_process = fnames.difference(processed_files) objects = [] if nrows is not None: if not to_process: to_process = list(processed_files)[-nrows:] else: to_process = list(to_process)[-nrows:] if to_process: print("processing {} files".format(len(to_process))) for fname in to_process: data = read_array_of_dict_from_json(fname, key, nrows) data = cls.handle_dataframe_per_file(data, fname) if data is None: continue objects.append(data) data = pd.concat(objects) if processed_files and nrows is None: data = pd.concat((data, load_df(name))) for x in ["time", "start", "end"]: if x in data: data = data.sort_values(x) break if nrows is None: save_df(data, name) save_processed_files(fnames | processed_files, name) else: data = load_df(name) if nrows is not None: data = data.iloc[-nrows:] return data
load_df
def load_df( nrows )
View Source
@classmethod def load_df(cls, nrows): return load_df(cls.get_normalized_name(), nrows)
load_image_texts
def load_image_texts( glob_pattern_s, nrows=None )
View Source
@classmethod def load_image_texts(cls, glob_pattern_s, nrows=None): import pytesseract from PIL import Image if isinstance(glob_pattern_s, list): fnames = set() for glob_pattern in glob_pattern_s: fnames.update(set(just.glob(glob_pattern))) glob_pattern = "_".join(glob_pattern_s) else: fnames = set(just.glob(glob_pattern)) name = glob_pattern + "_" + normalize_name(cls.__name__) processed_files = get_processed_files(name) to_process = fnames.difference(processed_files) objects = [] cache = get_cache("tesseract") if nrows is not None: if not to_process: return load_df(name).iloc[-nrows:] else: to_process = list(to_process)[-nrows:] if to_process: for fname in to_process: if fname in cache: text = cache[fname] else: try: text = pytesseract.image_to_string(Image.open(just.make_path(fname))) except OSError as e: print("ERR", fname, e) continue cache[fname] = text time = datetime_from_timestamp(os.path.getmtime(fname), "utc") data = {"text": text, "path": fname, "title": fname.split("/")[-1], "time": time} objects.append(data) data = pd.DataFrame(objects) if processed_files and nrows is None: data = pd.concat((data, load_df(name))) for x in ["time", "start", "end"]: if x in data: data = data.sort_values(x) break if nrows is None: save_df(data, name) save_processed_files(fnames | processed_files, name) else: data = load_df(name) if nrows is not None: data = data.iloc[-nrows:] return data
load_json_file_modified_time
def load_json_file_modified_time( fname, nrows=None, from_cache=True, **kwargs )
View Source
@classmethod def load_json_file_modified_time(cls, fname, nrows=None, from_cache=True, **kwargs): name = fname + "_" + normalize_name(cls.__name__) modified_time = os.path.getmtime(os.path.expanduser(fname)) last_modified = get_last_mod_time(name) if modified_time != last_modified or not from_cache: data = just.read(fname) data = cls.handle_json(data, **kwargs) data = pd.DataFrame(data) if nrows is None: save_df(data, name) save_last_mod_time(modified_time, name) else: data = load_df(name) if nrows is not None: data = data.iloc[-nrows:] return data
load_object_per_newline
def load_object_per_newline( fname, nrows=None )
Iterates over a file containing an object per line (e.g. .jsonl or .txt).
Will only handle new lines not seen earlier; it detects this by storing the number-of-objects seen.
You should implement object_to_row(cls, row)
on your class that returns a dictionary.
View Source
@classmethod def load_object_per_newline(cls, fname, nrows=None): """ Iterates over a file containing an object per line (e.g. .jsonl or .txt). Will only handle new lines not seen earlier; it detects this by storing the number-of-objects seen. You should implement `object_to_row(cls, row)` on your class that returns a dictionary. """ data = [] name = fname + "_" + normalize_name(cls.__name__) newline_count = get_newline_count(name) for i, x in enumerate(just.iread(fname)): if nrows is None: if i < newline_count: continue row = cls.object_to_row(x) if row is None: continue data.append(row) # breaking at approx 5 rows if nrows is not None and i > nrows: break if data: data = pd.DataFrame(data) if newline_count and nrows is None: data = pd.concat((data, load_df(name))) if nrows is None: data = save_df(data, name) n = i + 1 save_newline_count(n, name) else: data = load_df(name) if nrows is not None: data = data.iloc[-nrows:] return data
load_sample_data
def load_sample_data( )
View Source
@classmethod def load_sample_data(cls): nostalgia_dir = os.path.dirname(nostalgia.__file__) fname = os.path.join(nostalgia_dir, "data/samples/" + cls.class_df_name() + ".parquet") if os.path.exists(fname): print("loaded method 1") df = pd.read_parquet(fname) else: import pkgutil from io import BytesIO nostalgia_dir = os.path.dirname(nostalgia.__file__) sample_name = "data/samples/" + cls.class_df_name() + ".parquet" data = pkgutil.get_data("nostalgia", sample_name) print("loaded method 2") df = pd.read_parquet(BytesIO(data)) return cls(df)
register
def register( )
View Source
@classmethod def register(cls): return cls.load(nrows=5)
reveal
def reveal( )
View Source
@classmethod def reveal(cls): global ANONYMIZED ANONYMIZED = False
save_df
def save_df( df, name=None )
View Source
@classmethod def save_df(cls, df, name=None): return save_df(df, name or cls.get_normalized_name())
Instance variables
at_home
at_work
df_name
duration
during_office_hours
end
in_office_days
in_office_hours
last_day
last_month
last_week
last_year
outside_office_hours
start
text_cols
time
when_asleep
yesterday
Methods
add_heartrate
def add_heartrate( self )
View Source
def add_heartrate(self): return self.take_from("heartrate", "value")
as_simple
def as_simple( self, max_n=None )
View Source
def as_simple(self, max_n=None): data = { "title": self.df_name, # default, to be overwritten "url": None, "start": None, "end": None, # "body": None, "type": self.df_name, "interval": True, "sender": None, "value": getattr(self, "value", None), "index_loc": self.index, } for x in ["title", "name", "naam", "subject", "url", "content", "text", "value"]: res = getattr(self, x, None) if res is not None: data["title"] = res break res = getattr(self, "sender", None) if res is not None: data["sender"] = res for x in ["url", "path", "file"]: res = getattr(self, x, None) if res is not None: data["url"] = res break for x in ["start", "time", "timestamp"]: res = getattr(self, x, None) if res is not None: data["start"] = res break for x in ["end"]: res = getattr(self, x, None) if res is not None: data["end"] = res - pd.Timedelta(microseconds=1) break if data["end"] is None: data["end"] = data["start"] + pd.Timedelta(minutes=5) data["interval"] = False try: data = pd.DataFrame(data).sort_values("start") if max_n is not None: data = data.iloc[-max_n:] return data except ValueError: raise ValueError(f"No fields are mapped for {self.__class__.__name__}")
at
def at( self, time_or_place )
View Source
def at(self, time_or_place): if isinstance(time_or_place, NDF) and time_or_place.df_name.endswith("places"): return self.when_at(time_or_place) if isinstance(time_or_place, str): mp = parse_date_tz(time_or_place) if mp: start = mp.start_date end = mp.end_date return self.at_time(start, end) else: return self.when_at(get_type_from_registry("places").containing(time_or_place)) raise ValueError("neither time nor place was passed")
at_day
def at_day( self, day_or_class )
View Source
def at_day(self, day_or_class): return self[self._select_at_day(day_or_class)]
at_night
def at_night( self, start=22, end=8 )
View Source
def at_night(self, start=22, end=8): return self.between_hours(start, end)
at_time
def at_time( self, start, end=None, sort_diff=True, **window_kwargs )
View Source
def at_time(self, start, end=None, sort_diff=True, **window_kwargs): if is_mp(start): start = start.start_date end = start.end_date elif isinstance(start, str) and end is None: mp = parse_date_tz(start) start = mp.start_date end = mp.end_date elif isinstance(start, str) and isinstance(end, str): mp = parse_date_tz(start) start = mp.start_date mp = parse_date_tz(end) end = mp.end_date elif end is None and window_kwargs: end = start elif end is None: raise ValueError( "Either a metaperiod, a date string, 2 times, or time + window_kwargs." ) self.infer_time() if window_kwargs: start = start - pd.Timedelta(**window_kwargs) end = end + pd.Timedelta(**window_kwargs) if self._start_col is None: res = self[ab_overlap_c(start, end, self[self._time_col])] else: res = self[ab_overlap_cd(self[self._start_col], self[self._end_col], start, end)] if not res.empty and sort_diff: # avg_time = start + (end - start) / 2 # res["sort_score"] = -abs(res[self._time_col] - avg_time) # res = res.sort_values('sort_score').drop('sort_score', axis=1) res["sort_score"] = res[self._time_col] res = res.sort_values('sort_score').drop('sort_score', axis=1) return self.__class__(res)
between_hours
def between_hours( self, start=22, end=8 )
View Source
def between_hours(self, start=22, end=8): if self._start_col is not None: return self[(self.start.dt.hour > start) | (self.end.dt.hour < end)] return self[(self.time.dt.hour > start) & (self.time.dt.hour < end)]
browsing
def browsing( self, other, **window_kwargs )
View Source
def browsing(self, other, **window_kwargs): if isinstance(other, str): other = get_type_from_registry("browser").containing(other) return self.__class__(join_time(other, self, **window_kwargs))
by_me
def by_me( self )
View Source
@nlp("filter", "by me", "i", "my") def by_me(self): return self
col_contains
def col_contains( self, string, col_name, case=False, regex=False, na=False )
View Source
def col_contains(self, string, col_name, case=False, regex=False, na=False): return self[self[col_name].str.contains(string, case=case, regex=regex, na=na)]
containing
def containing( self, string, col_name=None, case=False, regex=True, na=False, bound=True )
Filters using string in all string columns when col_name is None, otherwise in just that one
When bound=True
it means to add word boundaries to the regex.
case=True is whether to be case-sensitive
regex=True means to treat string as regex
na=False means to consider NaN to be considered False
View Source
def containing(self, string, col_name=None, case=False, regex=True, na=False, bound=True): """ Filters using string in all string columns when col_name is None, otherwise in just that one When `bound=True` it means to add word boundaries to the regex. case=True is whether to be case-sensitive regex=True means to treat string as regex na=False means to consider NaN to be considered False """ if regex and bound: string = r"\b" + string + r"\b" if col_name is not None: return self.col_contains(string, col_name, case, regex, na) bool_cols = [ self[x].str.contains(string, case=case, regex=regex, na=na) for x in self.text_cols ] bool_array = bool_cols[0] for b in bool_cols[1:]: bool_array = np.logical_or(bool_array, b) return self.__class__(self[bool_array])
count
def count( self )
View Source
@nlp("end", "how many", "how many times", "how often") def count(self): return self.shape[0]
create_sample_data
def create_sample_data( self )
View Source
def create_sample_data(self): nostalgia_dir = os.path.dirname(nostalgia.__file__) fname = os.path.join(nostalgia_dir, "data/samples/" + self.df_name + ".parquet") # verify that we can process it _ = self.as_simple() sample = self.iloc[:100].reset_index().drop("index", axis=1) # if self.is_anonymized: # for x in self.anonymized: # dtype = self.dtypes[x] # if str(self.dtypes[x]) == "object": # sample[x] = x # else: # sample[x] = np.random.choice(sample[x], sample.shape[0]) # assert sample[x].dtype == dtype n = min(sample.shape[0], 5) if n == 0: raise ValueError("Empty DataFrame, cannot make sample") sample = ( sample.sample(n) .reset_index() .drop("index", axis=1) .drop("level_0", axis=1, errors="ignore") ) sample.to_parquet(fname) print(f"Sample save as {os.path.abspath(fname)}") return sample
duration_longer_than
def duration_longer_than( self, **timedelta_kwargs )
View Source
def duration_longer_than(self, **timedelta_kwargs): return self[(self.end - self.time) >= timedelta(**timedelta_kwargs)]
duration_shorter_than
def duration_shorter_than( self, **timedelta_kwargs )
View Source
def duration_shorter_than(self, **timedelta_kwargs): return self[(self.end - self.time) <= timedelta(**timedelta_kwargs)]
get_type_from_registry
def get_type_from_registry( self, tp )
View Source
def get_type_from_registry(self, tp): for key, value in registry.items(): if key.endswith(tp): return value
head
def head( self, *args, **kwargs )
View Source
def head(self, *args, **kwargs): return self.__class__(super().head(*args, **kwargs))
heartrate_above
def heartrate_above( self, value )
View Source
def heartrate_above(self, value): return self.heartrate_range(value)
heartrate_below
def heartrate_below( self, value )
View Source
def heartrate_below(self, value): return self.heartrate_range(None, value)
heartrate_range
def heartrate_range( self, low, high=None )
View Source
def heartrate_range(self, low, high=None): if "heartrate_value" not in self.columns: self.add_heartrate() if high is not None and low is not None: return self[(self["heartrate_value"] >= low) & self["heartrate_value"] < high] if low is not None: return self[self["heartrate_value"] >= low] if high is not None: return self[self["heartrate_value"] < high]
in_a
def in_a( self, s )
View Source
def in_a(self, s): return self.near(s)
infer_time
def infer_time( self )
View Source
def infer_time(self): if self.__class__.__name__ == "Results": self._start_col, self._time_col, self._end_col = "start", "start", "end" return times = [x for x, y in zip(self.columns, self.dtypes) if "datetime" in str(y)] levels = [self.time_level(self[x]) for x in times] if not levels: raise ValueError( f"Either 1 or 2 columns should be of type datetime for {self.__class__.__name__} (0 found)" ) max_level = max(levels) # workaround # start: 10:00:00 # end: 10:00:59 times = [t for t, l in zip(times, levels) if l == max_level or (l == 2 and max_level == 3)] num_times = len(times) self.num_times = num_times if num_times == 0: self._start_col, self._time_col, self._end_col = None, None, None elif num_times == 1: self._start_col, self._time_col, self._end_col = None, times[0], None elif num_times == 2: col1, col2 = times sub = self[self[col1].notnull() & self[col2].notnull()] a, b = sub[col1], sub[col2] if (a >= b).all(): col1, col2 = col2, col1 elif not (a <= b).all(): raise ValueError( "Not strictly one col higher than other with dates, can't determine" ) if col1 == "end" and col2 == "start": col2, col1 = col1, col2 self._start_col, self._time_col, self._end_col = col1, col1, col2 interval_index = pd.IntervalIndex.from_arrays( self[self._start_col], self[self._end_col] ) self.set_index(interval_index, inplace=True) self.sort_index(inplace=True) else: msg = 'infer time failed: there can only be 1 or 2 datetime columns at the same granularity.' raise Exception(msg + " Found: " + str(times))
last
def last( self )
View Source
@nlp("filter", "last", "last time", "most recently") def last(self): _ = self.time # to get inferred time if not set col = self._time_col or self._start_col return self.__class__(self.sort_values(col, na_position="last", ascending=False).iloc[:1])
near
def near( self, s )
View Source
def near(self, s): if isinstance(s, NDF) and s.df_name.endswith("places"): selection = s else: selection = get_type_from_registry("places").containing(s) return self.when_at(selection)
not_at_day
def not_at_day( self, day_or_class )
View Source
def not_at_day(self, day_or_class): return self[~self._select_at_day(day_or_class)]
query
def query( self, expr )
View Source
def query(self, expr): return self.__class__(super().query(expr))
read
def read( self, index )
View Source
def read(self, index): return just.read(self.path[index])
received_by
def received_by( self, name=None, email=None, case=False )
View Source
def received_by(self, name=None, email=None, case=False): if name is not None and email is not None: a = self.receiver.str.contains(name, case=case, na=False) b = self.receiver.str.contains(email, case=case, na=False) res = self[a | b] elif name is not None: res = self[self.receiver.str.contains(name, case=case, na=False)] elif email is not None: res = self[self.receiver.str.contains(email, case=case, na=False)] return self.__class__(res)
sent_by
def sent_by( self, name=None, email=None, case=False )
View Source
def sent_by(self, name=None, email=None, case=False): if name is not None and email is not None: a = self.sender.str.contains(name, case=case, na=False) b = self.sender.str.contains(email, case=case, na=False) res = self[a | b] elif name is not None: res = self[self.sender.str.contains(name, case=case, na=False)] elif email is not None: res = self[self.sender.str.contains(email, case=case, na=False)] return self.__class__(res)
show_me
def show_me( self )
View Source
@nlp("end", "show", "show me", "show me the", "show the", "what") def show_me(self): _ = self.time # to get inferred time if not set col = self._time_col or self._start_col return self.__class__(self.sort_values(col, na_position="last", ascending=False))
sort_values
def sort_values( self, by, axis=0, ascending=True, inplace=False, kind='quicksort', na_position='last' )
View Source
def sort_values( self, by, axis=0, ascending=True, inplace=False, kind='quicksort', na_position='last' ): return self.__class__( pd.DataFrame.sort_values(self, by, axis, ascending, inplace, kind, na_position) )
tail
def tail( self, *args, **kwargs )
View Source
def tail(self, *args, **kwargs): return self.__class__(super().tail(*args, **kwargs))
take_from
def take_from( self, registry_ending, col_name )
View Source
def take_from(self, registry_ending, col_name): for registry_type in registry: if not registry_type.endswith(registry_ending): continue # TODO: loop over columns, so we only do index lookup once # TODO: do not only try self.time but also self.end new_name = registry_ending + "_" + col_name if new_name in self.columns: return self[new_name] tp = get_type_from_registry(registry_type) results = [] if not self.inferred_time: self.infer_time() for x in self[self._time_col]: try: res = tp.loc[x] if not isinstance(res, pd.Series): res = res.iloc[0] res = res[col_name] except (KeyError, TypeError): res = np.nan results.append(res) self[new_name] = results return self[new_name]
time_level
def time_level( self, col )
View Source
def time_level(self, col): if (col.dt.microsecond != 0).any(): return 4 if (col.dt.second != 0).any(): return 3 if (col.dt.minute != 0).any(): return 2 if (col.dt.hour != 0).any(): return 1 return 0
to_html
def to_html( self )
View Source
def to_html(self): if self.selected_columns: data = pd.DataFrame({x: getattr(self, x) for x in self.selected_columns}) return data.to_html() return super().to_html()
to_place
def to_place( self )
View Source
def to_place(self): results = [] places = get_type_from_registry("places") for time in self.time: try: results.append(places.iloc[places.index.get_loc(time)].iloc[0]) except (TypeError, KeyError): pass return places.__class__(results)
view
def view( self, index )
View Source
def view(self, index): view(self.path[index])
when
def when( self, other, **window_kwargs )
View Source
def when_at(self, other, **window_kwargs): if isinstance(other, str): other = get_type_from_registry("places").containing(other) return self.__class__(join_time(other, self, **window_kwargs))
when_at
def when_at( self, other, **window_kwargs )
View Source
def when_at(self, other, **window_kwargs): if isinstance(other, str): other = get_type_from_registry("places").containing(other) return self.__class__(join_time(other, self, **window_kwargs))