Skip to content

Module nostalgia.sources.google.gmail

View Source
import just

import pandas as pd

from datetime import datetime

from nostalgia.times import tz, parse

from nostalgia.data_loading import read_array_of_dict_from_json

from nostalgia.sources.google import Google

def try_parse(x):

    try:

        if x.endswith("PST"):

            x = x.replace("PST", "-0800 (PST)")

        elif x.endswith("PDT"):

            x = x.replace("PDT", "-0700 (PDT)")

        d = parse(x)

        if d.tzinfo is None:

            d = d.replace(tzinfo=tz)

        return d

    except:

        return datetime(1970, 1, 1, 0, 0, 0, tzinfo=tz)

# class MBox:

#     ingest_settings = {

#         "ingest_glob": "~/Downloads/*.mbox",

#         "recent_only": False,

#         "delete_existing": False,

#     }

class Gmail(Google):

    me = []

    @classmethod

    def handle_dataframe_per_file(cls, data, fname):

        data["subject"] = data["subject"].astype(str)

        data["to"] = data["to"].astype(str)

        data["sender"] = data["from"].str.extract("<([^>]+)>")

        data.loc[data["sender"].isnull(), "sender"] = data[data["sender"].isnull()][

            "from"

        ].str.strip('"')

        data["sent"] = data.sender.str.contains("|".join(cls.me), na=False)

        data["receiver"] = data["to"].str.extract("<([^>]+)>")

        data.loc[data["receiver"].isnull(), "receiver"] = data.loc[data["receiver"].isnull(), "to"]

        data["received"] = data.receiver.str.contains("|".join(cls.me), na=False)

        data["timestamp"] = pd.to_datetime([try_parse(x) for x in data.date], utc=True).tz_convert(

            tz

        )

        data.drop("date", axis=1, inplace=True)

        return data

    @classmethod

    def load(cls, nrows=None, from_cache=True, **kwargs):

        dfs = [

            cls.load_data_file_modified_time(file_path, nrows=nrows, from_cache=from_cache)

            for file_path in just.glob("~/nostalgia_data/input/google/Takeout/Mail/*.mbox")

        ]

        dfs = [x for x in dfs if not x.empty]

        return cls(pd.concat(dfs))

    def sent_by(self, name=None, email=None, case=False):

        if name is not None and email is not None:

            a = self.sender.str.contains(name, case=case, na=False)

            b = self.sender.str.contains(email, case=case, na=False)

            res = self[a | b]

        elif name is not None:

            res = self[self.sender.str.contains(name, case=case, na=False)]

        elif email is not None:

            res = self[self.sender.str.contains(email, case=case, na=False)]

        return self.__class__(res)

    def received_by(self, name=None, email=None, case=False):

        if name is not None and email is not None:

            a = self.receiver.str.contains(name, case=case, na=False)

            b = self.receiver.str.contains(email, case=case, na=False)

            res = self[a | b]

        elif name is not None:

            res = self[self.receiver.str.contains(name, case=case, na=False)]

        elif email is not None:

            res = self[self.receiver.str.contains(email, case=case, na=False)]

        return self.__class__(res)

Functions

try_parse

def try_parse(
    x
)
View Source
def try_parse(x):

    try:

        if x.endswith("PST"):

            x = x.replace("PST", "-0800 (PST)")

        elif x.endswith("PDT"):

            x = x.replace("PDT", "-0700 (PDT)")

        d = parse(x)

        if d.tzinfo is None:

            d = d.replace(tzinfo=tz)

        return d

    except:

        return datetime(1970, 1, 1, 0, 0, 0, tzinfo=tz)

Classes

Gmail

class Gmail(
    data
)
View Source
class Gmail(Google):

    me = []

    @classmethod

    def handle_dataframe_per_file(cls, data, fname):

        data["subject"] = data["subject"].astype(str)

        data["to"] = data["to"].astype(str)

        data["sender"] = data["from"].str.extract("<([^>]+)>")

        data.loc[data["sender"].isnull(), "sender"] = data[data["sender"].isnull()][

            "from"

        ].str.strip('"')

        data["sent"] = data.sender.str.contains("|".join(cls.me), na=False)

        data["receiver"] = data["to"].str.extract("<([^>]+)>")

        data.loc[data["receiver"].isnull(), "receiver"] = data.loc[data["receiver"].isnull(), "to"]

        data["received"] = data.receiver.str.contains("|".join(cls.me), na=False)

        data["timestamp"] = pd.to_datetime([try_parse(x) for x in data.date], utc=True).tz_convert(

            tz

        )

        data.drop("date", axis=1, inplace=True)

        return data

    @classmethod

    def load(cls, nrows=None, from_cache=True, **kwargs):

        dfs = [

            cls.load_data_file_modified_time(file_path, nrows=nrows, from_cache=from_cache)

            for file_path in just.glob("~/nostalgia_data/input/google/Takeout/Mail/*.mbox")

        ]

        dfs = [x for x in dfs if not x.empty]

        return cls(pd.concat(dfs))

    def sent_by(self, name=None, email=None, case=False):

        if name is not None and email is not None:

            a = self.sender.str.contains(name, case=case, na=False)

            b = self.sender.str.contains(email, case=case, na=False)

            res = self[a | b]

        elif name is not None:

            res = self[self.sender.str.contains(name, case=case, na=False)]

        elif email is not None:

            res = self[self.sender.str.contains(email, case=case, na=False)]

        return self.__class__(res)

    def received_by(self, name=None, email=None, case=False):

        if name is not None and email is not None:

            a = self.receiver.str.contains(name, case=case, na=False)

            b = self.receiver.str.contains(email, case=case, na=False)

            res = self[a | b]

        elif name is not None:

            res = self[self.receiver.str.contains(name, case=case, na=False)]

        elif email is not None:

            res = self[self.receiver.str.contains(email, case=case, na=False)]

        return self.__class__(res)

Ancestors (in MRO)

  • nostalgia.sources.google.Google
  • nostalgia.ndf.NDF
  • nostalgia.anonymizer.Anonymizer
  • nostalgia.data_loading.Loader

Class variables

anonymized
ingest_settings
keywords
me
nlp_columns
nlp_when
selected_columns
vendor

Static methods

anonymize
def anonymize(

)
View Source
    @classmethod

    def anonymize(cls):

        global ANONYMIZED

        ANONYMIZED = True
class_df_name
def class_df_name(

)
View Source
    @classmethod

    def class_df_name(cls):

        name = normalize_name(cls.__name__)

        if cls.vendor is not None and not name.startswith(cls.vendor):

            name = cls.vendor + "_" + name

        return name
df_label
def df_label(

)
View Source
    @classmethod

    def df_label(cls):

        return normalize_name(cls.__name__).replace("_", " ").title()
get_normalized_name
def get_normalized_name(

)
View Source
    @classmethod

    def get_normalized_name(cls):

        return normalize_name(cls.__name__)
get_schema
def get_schema(
    *args,
    **kwargs
)
View Source
    @classmethod

    def get_schema(cls, *args, **kwargs):

        sample = cls.load(*args, nrows=5, **kwargs)

        return {k: v for k, v in zip(sample.columns, sample.dtypes)}
handle_dataframe_per_file
def handle_dataframe_per_file(
    data,
    fname
)
View Source
    @classmethod

    def handle_dataframe_per_file(cls, data, fname):

        data["subject"] = data["subject"].astype(str)

        data["to"] = data["to"].astype(str)

        data["sender"] = data["from"].str.extract("<([^>]+)>")

        data.loc[data["sender"].isnull(), "sender"] = data[data["sender"].isnull()][

            "from"

        ].str.strip('"')

        data["sent"] = data.sender.str.contains("|".join(cls.me), na=False)

        data["receiver"] = data["to"].str.extract("<([^>]+)>")

        data.loc[data["receiver"].isnull(), "receiver"] = data.loc[data["receiver"].isnull(), "to"]

        data["received"] = data.receiver.str.contains("|".join(cls.me), na=False)

        data["timestamp"] = pd.to_datetime([try_parse(x) for x in data.date], utc=True).tz_convert(

            tz

        )

        data.drop("date", axis=1, inplace=True)

        return data
ingest
def ingest(

)
View Source
    @classmethod

    def ingest(cls):

        load_from_download(vendor=cls.vendor, **cls.ingest_settings)
is_anonymized
def is_anonymized(

)
View Source
    @classmethod

    def is_anonymized(cls):

        return ANONYMIZED
latest_file_is_historic
def latest_file_is_historic(
    glob,
    key_name='',
    nrows=None,
    from_cache=True
)

Glob is for using a wildcard pattern, and the last created file will be loaded. See load_data_file_modified_time for further reference. Returns a pd.DataFrame

View Source
    @classmethod

    def latest_file_is_historic(cls, glob, key_name="", nrows=None, from_cache=True):

        """

        Glob is for using a wildcard pattern, and the last created file will be loaded.

        See `load_data_file_modified_time` for further reference.

        Returns a pd.DataFrame

        """

        recent = max([x for x in just.glob(glob) if "(" not in x], key=os.path.getctime)

        return cls.load_data_file_modified_time(recent, key_name, nrows, from_cache)
load
def load(
    nrows=None,
    from_cache=True,
    **kwargs
)
View Source
    @classmethod

    def load(cls, nrows=None, from_cache=True, **kwargs):

        dfs = [

            cls.load_data_file_modified_time(file_path, nrows=nrows, from_cache=from_cache)

            for file_path in just.glob("~/nostalgia_data/input/google/Takeout/Mail/*.mbox")

        ]

        dfs = [x for x in dfs if not x.empty]

        return cls(pd.concat(dfs))
load_data_file_modified_time
def load_data_file_modified_time(
    fname,
    key_name='',
    nrows=None,
    from_cache=True,
    **kwargs
)

It will load from cache if filename is not changed since last run (and there is a cache). If it has changed, it will reprocess and save it in cache (including the modified_time). Handles csv, mbox and json currently. key_name is only for json. nrows is for enabling quickly loading a sample. from_cache=False allows ignoring the cache and reprocessing the file.

Loading the csv, json or mbox file will yield you a DF IMPORTANT: assumes you implement handle_dataframe_per_file This is the post-processing required after the file is loaded, for e.g. converting time dropping and adding columns.

View Source
    @classmethod

    def load_data_file_modified_time(

        cls, fname, key_name="", nrows=None, from_cache=True, **kwargs

    ):

        """

        It will load from cache if filename is not changed since last run (and there is a cache).

        If it has changed, it will reprocess and save it in cache (including the modified_time).

        Handles csv, mbox and json currently.

        key_name is only for json.

        nrows is for enabling quickly loading a sample.

        from_cache=False allows ignoring the cache and reprocessing the file.

        Loading the csv, json or mbox file will yield you a DF

        IMPORTANT: assumes you implement `handle_dataframe_per_file`

        This is the post-processing required after the file is loaded, for e.g. converting time

        dropping and adding columns.

        """

        name = fname + "_" + normalize_name(cls.__name__)

        modified_time = os.path.getmtime(os.path.expanduser(fname))

        last_modified = get_last_mod_time(name)

        if modified_time != last_modified or not from_cache:

            if fname.endswith(".csv"):

                data = pd.read_csv(fname, error_bad_lines=False, nrows=nrows, **kwargs)

            elif fname.endswith(".ics"):

                from icalevents.icalevents import events

                evs = events(file=fname, start=datetime.fromtimestamp(0), end=datetime.now())

                data = [

                    {

                        "title": ev.summary,

                        "description": ev.description,

                        "location": ev.location,

                        "start": ev.start,

                        "end": ev.end,

                    }

                    for ev in evs

                ]

                data = pd.DataFrame(data)

            elif fname.endswith(".mbox"):

                import mailbox

                m = mailbox.mbox(fname)

                data = pd.DataFrame(

                    [{l: x[l] for l in ["from", "to", "date", "subject"]} for x in m]

                )

            else:

                data = read_array_of_dict_from_json(fname, key_name, nrows, **kwargs)

            data = cls.handle_dataframe_per_file(data, fname)

            if nrows is None:

                save_df(data, name)

                save_last_mod_time(modified_time, name)

        else:

            data = load_df(name, nrows)

        if nrows is not None:

            data = data.iloc[-nrows:]

        return data
load_dataframe_per_json_file
def load_dataframe_per_json_file(
    glob_pattern,
    key='',
    nrows=None
)
View Source
    @classmethod

    def load_dataframe_per_json_file(cls, glob_pattern, key="", nrows=None):

        fnames = set(just.glob(glob_pattern))

        name = glob_pattern + "_" + normalize_name(cls.__name__)

        processed_files = get_processed_files(name)

        to_process = fnames.difference(processed_files)

        objects = []

        if nrows is not None:

            if not to_process:

                to_process = list(processed_files)[-nrows:]

            else:

                to_process = list(to_process)[-nrows:]

        if to_process:

            print("processing {} files".format(len(to_process)))

            for fname in to_process:

                data = read_array_of_dict_from_json(fname, key, nrows)

                data = cls.handle_dataframe_per_file(data, fname)

                if data is None:

                    continue

                objects.append(data)

            data = pd.concat(objects)

            if processed_files and nrows is None:

                data = pd.concat((data, load_df(name)))

            for x in ["time", "start", "end"]:

                if x in data:

                    data = data.sort_values(x)

                    break

            if nrows is None:

                save_df(data, name)

                save_processed_files(fnames | processed_files, name)

        else:

            data = load_df(name)

        if nrows is not None:

            data = data.iloc[-nrows:]

        return data
load_df
def load_df(
    nrows
)
View Source
    @classmethod

    def load_df(cls, nrows):

        return load_df(cls.get_normalized_name(), nrows)
load_image_texts
def load_image_texts(
    glob_pattern_s,
    nrows=None
)
View Source
    @classmethod

    def load_image_texts(cls, glob_pattern_s, nrows=None):

        import pytesseract

        from PIL import Image

        if isinstance(glob_pattern_s, list):

            fnames = set()

            for glob_pattern in glob_pattern_s:

                fnames.update(set(just.glob(glob_pattern)))

            glob_pattern = "_".join(glob_pattern_s)

        else:

            fnames = set(just.glob(glob_pattern))

        name = glob_pattern + "_" + normalize_name(cls.__name__)

        processed_files = get_processed_files(name)

        to_process = fnames.difference(processed_files)

        objects = []

        cache = get_cache("tesseract")

        if nrows is not None:

            if not to_process:

                return load_df(name).iloc[-nrows:]

            else:

                to_process = list(to_process)[-nrows:]

        if to_process:

            for fname in to_process:

                if fname in cache:

                    text = cache[fname]

                else:

                    try:

                        text = pytesseract.image_to_string(Image.open(just.make_path(fname)))

                    except OSError as e:

                        print("ERR", fname, e)

                        continue

                    cache[fname] = text

                time = datetime_from_timestamp(os.path.getmtime(fname), "utc")

                data = {"text": text, "path": fname, "title": fname.split("/")[-1], "time": time}

                objects.append(data)

            data = pd.DataFrame(objects)

            if processed_files and nrows is None:

                data = pd.concat((data, load_df(name)))

            for x in ["time", "start", "end"]:

                if x in data:

                    data = data.sort_values(x)

                    break

            if nrows is None:

                save_df(data, name)

                save_processed_files(fnames | processed_files, name)

        else:

            data = load_df(name)

        if nrows is not None:

            data = data.iloc[-nrows:]

        return data
load_json_file_modified_time
def load_json_file_modified_time(
    fname,
    nrows=None,
    from_cache=True,
    **kwargs
)
View Source
    @classmethod

    def load_json_file_modified_time(cls, fname, nrows=None, from_cache=True, **kwargs):

        name = fname + "_" + normalize_name(cls.__name__)

        modified_time = os.path.getmtime(os.path.expanduser(fname))

        last_modified = get_last_mod_time(name)

        if modified_time != last_modified or not from_cache:

            data = just.read(fname)

            data = cls.handle_json(data, **kwargs)

            data = pd.DataFrame(data)

            if nrows is None:

                save_df(data, name)

                save_last_mod_time(modified_time, name)

        else:

            data = load_df(name)

        if nrows is not None:

            data = data.iloc[-nrows:]

        return data
load_object_per_newline
def load_object_per_newline(
    fname,
    nrows=None
)

Iterates over a file containing an object per line (e.g. .jsonl or .txt). Will only handle new lines not seen earlier; it detects this by storing the number-of-objects seen. You should implement object_to_row(cls, row) on your class that returns a dictionary.

View Source
    @classmethod

    def load_object_per_newline(cls, fname, nrows=None):

        """

        Iterates over a file containing an object per line (e.g. .jsonl or .txt).

        Will only handle new lines not seen earlier; it detects this by storing the number-of-objects seen.

        You should implement `object_to_row(cls, row)` on your class that returns a dictionary.

        """

        data = []

        name = fname + "_" + normalize_name(cls.__name__)

        newline_count = get_newline_count(name)

        for i, x in enumerate(just.iread(fname)):

            if nrows is None:

                if i < newline_count:

                    continue

            row = cls.object_to_row(x)

            if row is None:

                continue

            data.append(row)

            # breaking at approx 5 rows

            if nrows is not None and i > nrows:

                break

        if data:

            data = pd.DataFrame(data)

            if newline_count and nrows is None:

                data = pd.concat((data, load_df(name)))

            if nrows is None:

                data = save_df(data, name)

                n = i + 1

                save_newline_count(n, name)

        else:

            data = load_df(name)

        if nrows is not None:

            data = data.iloc[-nrows:]

        return data
load_sample_data
def load_sample_data(

)
View Source
    @classmethod

    def load_sample_data(cls):

        nostalgia_dir = os.path.dirname(nostalgia.__file__)

        fname = os.path.join(nostalgia_dir, "data/samples/" + cls.class_df_name() + ".parquet")

        if os.path.exists(fname):

            print("loaded method 1")

            df = pd.read_parquet(fname)

        else:

            import pkgutil

            from io import BytesIO

            nostalgia_dir = os.path.dirname(nostalgia.__file__)

            sample_name = "data/samples/" + cls.class_df_name() + ".parquet"

            data = pkgutil.get_data("nostalgia", sample_name)

            print("loaded method 2")

            df = pd.read_parquet(BytesIO(data))

        return cls(df)
register
def register(

)
View Source
    @classmethod

    def register(cls):

        return cls.load(nrows=5)
reveal
def reveal(

)
View Source
    @classmethod

    def reveal(cls):

        global ANONYMIZED

        ANONYMIZED = False
save_df
def save_df(
    df,
    name=None
)
View Source
    @classmethod

    def save_df(cls, df, name=None):

        return save_df(df, name or cls.get_normalized_name())

Instance variables

at_home
at_work
df_name
duration
during_office_hours
end
in_office_days
in_office_hours
last_day
last_month
last_week
last_year
outside_office_hours
start
text_cols
time
when_asleep
yesterday

Methods

add_heartrate
def add_heartrate(
    self
)
View Source
    def add_heartrate(self):

        return self.take_from("heartrate", "value")
as_simple
def as_simple(
    self,
    max_n=None
)
View Source
    def as_simple(self, max_n=None):

        data = {

            "title": self.df_name,  # default, to be overwritten

            "url": None,

            "start": None,

            "end": None,

            # "body": None,

            "type": self.df_name,

            "interval": True,

            "sender": None,

            "value": getattr(self, "value", None),

            "index_loc": self.index,

        }

        for x in ["title", "name", "naam", "subject", "url", "content", "text", "value"]:

            res = getattr(self, x, None)

            if res is not None:

                data["title"] = res

                break

        res = getattr(self, "sender", None)

        if res is not None:

            data["sender"] = res

        for x in ["url", "path", "file"]:

            res = getattr(self, x, None)

            if res is not None:

                data["url"] = res

                break

        for x in ["start", "time", "timestamp"]:

            res = getattr(self, x, None)

            if res is not None:

                data["start"] = res

                break

        for x in ["end"]:

            res = getattr(self, x, None)

            if res is not None:

                data["end"] = res - pd.Timedelta(microseconds=1)

                break

        if data["end"] is None:

            data["end"] = data["start"] + pd.Timedelta(minutes=5)

            data["interval"] = False

        try:

            data = pd.DataFrame(data).sort_values("start")

            if max_n is not None:

                data = data.iloc[-max_n:]

            return data

        except ValueError:

            raise ValueError(f"No fields are mapped for {self.__class__.__name__}")
at
def at(
    self,
    time_or_place
)
View Source
    def at(self, time_or_place):

        if isinstance(time_or_place, NDF) and time_or_place.df_name.endswith("places"):

            return self.when_at(time_or_place)

        if isinstance(time_or_place, str):

            mp = parse_date_tz(time_or_place)

            if mp:

                start = mp.start_date

                end = mp.end_date

                return self.at_time(start, end)

            else:

                return self.when_at(get_type_from_registry("places").containing(time_or_place))

        raise ValueError("neither time nor place was passed")
at_day
def at_day(
    self,
    day_or_class
)
View Source
    def at_day(self, day_or_class):

        return self[self._select_at_day(day_or_class)]
at_night
def at_night(
    self,
    start=22,
    end=8
)
View Source
    def at_night(self, start=22, end=8):

        return self.between_hours(start, end)
at_time
def at_time(
    self,
    start,
    end=None,
    sort_diff=True,
    **window_kwargs
)
View Source
    def at_time(self, start, end=None, sort_diff=True, **window_kwargs):

        if is_mp(start):

            start = start.start_date

            end = start.end_date

        elif isinstance(start, str) and end is None:

            mp = parse_date_tz(start)

            start = mp.start_date

            end = mp.end_date

        elif isinstance(start, str) and isinstance(end, str):

            mp = parse_date_tz(start)

            start = mp.start_date

            mp = parse_date_tz(end)

            end = mp.end_date

        elif end is None and window_kwargs:

            end = start

        elif end is None:

            raise ValueError(

                "Either a metaperiod, a date string, 2 times, or time + window_kwargs."

            )

        self.infer_time()

        if window_kwargs:

            start = start - pd.Timedelta(**window_kwargs)

            end = end + pd.Timedelta(**window_kwargs)

        if self._start_col is None:

            res = self[ab_overlap_c(start, end, self[self._time_col])]

        else:

            res = self[ab_overlap_cd(self[self._start_col], self[self._end_col], start, end)]

        if not res.empty and sort_diff:

            # avg_time = start + (end - start) / 2

            # res["sort_score"] = -abs(res[self._time_col] - avg_time)

            # res = res.sort_values('sort_score').drop('sort_score', axis=1)

            res["sort_score"] = res[self._time_col]

            res = res.sort_values('sort_score').drop('sort_score', axis=1)

        return self.__class__(res)
between_hours
def between_hours(
    self,
    start=22,
    end=8
)
View Source
    def between_hours(self, start=22, end=8):

        if self._start_col is not None:

            return self[(self.start.dt.hour > start) | (self.end.dt.hour < end)]

        return self[(self.time.dt.hour > start) & (self.time.dt.hour < end)]
browsing
def browsing(
    self,
    other,
    **window_kwargs
)
View Source
    def browsing(self, other, **window_kwargs):

        if isinstance(other, str):

            other = get_type_from_registry("browser").containing(other)

        return self.__class__(join_time(other, self, **window_kwargs))
by_me
def by_me(
    self
)
View Source
    @nlp("filter", "by me", "i", "my")

    def by_me(self):

        return self
col_contains
def col_contains(
    self,
    string,
    col_name,
    case=False,
    regex=False,
    na=False
)
View Source
    def col_contains(self, string, col_name, case=False, regex=False, na=False):

        return self[self[col_name].str.contains(string, case=case, regex=regex, na=na)]
containing
def containing(
    self,
    string,
    col_name=None,
    case=False,
    regex=True,
    na=False,
    bound=True
)

Filters using string in all string columns when col_name is None, otherwise in just that one When bound=True it means to add word boundaries to the regex. case=True is whether to be case-sensitive regex=True means to treat string as regex na=False means to consider NaN to be considered False

View Source
    def containing(self, string, col_name=None, case=False, regex=True, na=False, bound=True):

        """

        Filters using string in all string columns when col_name is None, otherwise in just that one

        When `bound=True` it means to add word boundaries to the regex.

        case=True is whether to be case-sensitive

        regex=True means to treat string as regex

        na=False means to consider NaN to be considered False

        """

        if regex and bound:

            string = r"\b" + string + r"\b"

        if col_name is not None:

            return self.col_contains(string, col_name, case, regex, na)

        bool_cols = [

            self[x].str.contains(string, case=case, regex=regex, na=na) for x in self.text_cols

        ]

        bool_array = bool_cols[0]

        for b in bool_cols[1:]:

            bool_array = np.logical_or(bool_array, b)

        return self.__class__(self[bool_array])
count
def count(
    self
)
View Source
    @nlp("end", "how many", "how many times", "how often")

    def count(self):

        return self.shape[0]
create_sample_data
def create_sample_data(
    self
)
View Source
    def create_sample_data(self):

        nostalgia_dir = os.path.dirname(nostalgia.__file__)

        fname = os.path.join(nostalgia_dir, "data/samples/" + self.df_name + ".parquet")

        # verify that we can process it

        _ = self.as_simple()

        sample = self.iloc[:100].reset_index().drop("index", axis=1)

        # if self.is_anonymized:

        #     for x in self.anonymized:

        #         dtype = self.dtypes[x]

        #         if str(self.dtypes[x]) == "object":

        #             sample[x] = x

        #         else:

        #             sample[x] = np.random.choice(sample[x], sample.shape[0])

        #         assert sample[x].dtype == dtype

        n = min(sample.shape[0], 5)

        if n == 0:

            raise ValueError("Empty DataFrame, cannot make sample")

        sample = (

            sample.sample(n)

            .reset_index()

            .drop("index", axis=1)

            .drop("level_0", axis=1, errors="ignore")

        )

        sample.to_parquet(fname)

        print(f"Sample save as {os.path.abspath(fname)}")

        return sample
duration_longer_than
def duration_longer_than(
    self,
    **timedelta_kwargs
)
View Source
    def duration_longer_than(self, **timedelta_kwargs):

        return self[(self.end - self.time) >= timedelta(**timedelta_kwargs)]
duration_shorter_than
def duration_shorter_than(
    self,
    **timedelta_kwargs
)
View Source
    def duration_shorter_than(self, **timedelta_kwargs):

        return self[(self.end - self.time) <= timedelta(**timedelta_kwargs)]
get_type_from_registry
def get_type_from_registry(
    self,
    tp
)
View Source
    def get_type_from_registry(self, tp):

        for key, value in registry.items():

            if key.endswith(tp):

                return value
def head(
    self,
    *args,
    **kwargs
)
View Source
    def head(self, *args, **kwargs):

        return self.__class__(super().head(*args, **kwargs))
heartrate_above
def heartrate_above(
    self,
    value
)
View Source
    def heartrate_above(self, value):

        return self.heartrate_range(value)
heartrate_below
def heartrate_below(
    self,
    value
)
View Source
    def heartrate_below(self, value):

        return self.heartrate_range(None, value)
heartrate_range
def heartrate_range(
    self,
    low,
    high=None
)
View Source
    def heartrate_range(self, low, high=None):

        if "heartrate_value" not in self.columns:

            self.add_heartrate()

        if high is not None and low is not None:

            return self[(self["heartrate_value"] >= low) & self["heartrate_value"] < high]

        if low is not None:

            return self[self["heartrate_value"] >= low]

        if high is not None:

            return self[self["heartrate_value"] < high]
in_a
def in_a(
    self,
    s
)
View Source
    def in_a(self, s):

        return self.near(s)
infer_time
def infer_time(
    self
)
View Source
    def infer_time(self):

        if self.__class__.__name__ == "Results":

            self._start_col, self._time_col, self._end_col = "start", "start", "end"

            return

        times = [x for x, y in zip(self.columns, self.dtypes) if "datetime" in str(y)]

        levels = [self.time_level(self[x]) for x in times]

        if not levels:

            raise ValueError(

                f"Either 1 or 2 columns should be of type datetime for {self.__class__.__name__} (0 found)"

            )

        max_level = max(levels)

        # workaround

        # start: 10:00:00

        # end:   10:00:59

        times = [t for t, l in zip(times, levels) if l == max_level or (l == 2 and max_level == 3)]

        num_times = len(times)

        self.num_times = num_times

        if num_times == 0:

            self._start_col, self._time_col, self._end_col = None, None, None

        elif num_times == 1:

            self._start_col, self._time_col, self._end_col = None, times[0], None

        elif num_times == 2:

            col1, col2 = times

            sub = self[self[col1].notnull() & self[col2].notnull()]

            a, b = sub[col1], sub[col2]

            if (a >= b).all():

                col1, col2 = col2, col1

            elif not (a <= b).all():

                raise ValueError(

                    "Not strictly one col higher than other with dates, can't determine"

                )

            if col1 == "end" and col2 == "start":

                col2, col1 = col1, col2

            self._start_col, self._time_col, self._end_col = col1, col1, col2

            interval_index = pd.IntervalIndex.from_arrays(

                self[self._start_col], self[self._end_col]

            )

            self.set_index(interval_index, inplace=True)

            self.sort_index(inplace=True)

        else:

            msg = 'infer time failed: there can only be 1 or 2 datetime columns at the same granularity.'

            raise Exception(msg + " Found: " + str(times))
last
def last(
    self
)
View Source
    @nlp("filter", "last", "last time", "most recently")

    def last(self):

        _ = self.time  # to get inferred time if not set

        col = self._time_col or self._start_col

        return self.__class__(self.sort_values(col, na_position="last", ascending=False).iloc[:1])
near
def near(
    self,
    s
)
View Source
    def near(self, s):

        if isinstance(s, NDF) and s.df_name.endswith("places"):

            selection = s

        else:

            selection = get_type_from_registry("places").containing(s)

        return self.when_at(selection)
not_at_day
def not_at_day(
    self,
    day_or_class
)
View Source
    def not_at_day(self, day_or_class):

        return self[~self._select_at_day(day_or_class)]
query
def query(
    self,
    expr
)
View Source
    def query(self, expr):

        return self.__class__(super().query(expr))
read
def read(
    self,
    index
)
View Source
    def read(self, index):

        return just.read(self.path[index])
received_by
def received_by(
    self,
    name=None,
    email=None,
    case=False
)
View Source
    def received_by(self, name=None, email=None, case=False):

        if name is not None and email is not None:

            a = self.receiver.str.contains(name, case=case, na=False)

            b = self.receiver.str.contains(email, case=case, na=False)

            res = self[a | b]

        elif name is not None:

            res = self[self.receiver.str.contains(name, case=case, na=False)]

        elif email is not None:

            res = self[self.receiver.str.contains(email, case=case, na=False)]

        return self.__class__(res)
sent_by
def sent_by(
    self,
    name=None,
    email=None,
    case=False
)
View Source
    def sent_by(self, name=None, email=None, case=False):

        if name is not None and email is not None:

            a = self.sender.str.contains(name, case=case, na=False)

            b = self.sender.str.contains(email, case=case, na=False)

            res = self[a | b]

        elif name is not None:

            res = self[self.sender.str.contains(name, case=case, na=False)]

        elif email is not None:

            res = self[self.sender.str.contains(email, case=case, na=False)]

        return self.__class__(res)
show_me
def show_me(
    self
)
View Source
    @nlp("end", "show", "show me", "show me the", "show the", "what")

    def show_me(self):

        _ = self.time  # to get inferred time if not set

        col = self._time_col or self._start_col

        return self.__class__(self.sort_values(col, na_position="last", ascending=False))
sort_values
def sort_values(
    self,
    by,
    axis=0,
    ascending=True,
    inplace=False,
    kind='quicksort',
    na_position='last'
)
View Source
    def sort_values(

        self, by, axis=0, ascending=True, inplace=False, kind='quicksort', na_position='last'

    ):

        return self.__class__(

            pd.DataFrame.sort_values(self, by, axis, ascending, inplace, kind, na_position)

        )
tail
def tail(
    self,
    *args,
    **kwargs
)
View Source
    def tail(self, *args, **kwargs):

        return self.__class__(super().tail(*args, **kwargs))
take_from
def take_from(
    self,
    registry_ending,
    col_name
)
View Source
    def take_from(self, registry_ending, col_name):

        for registry_type in registry:

            if not registry_type.endswith(registry_ending):

                continue

            # TODO: loop over columns, so we only do index lookup once

            # TODO: do not only try self.time but also self.end

            new_name = registry_ending + "_" + col_name

            if new_name in self.columns:

                return self[new_name]

            tp = get_type_from_registry(registry_type)

            results = []

            if not self.inferred_time:

                self.infer_time()

            for x in self[self._time_col]:

                try:

                    res = tp.loc[x]

                    if not isinstance(res, pd.Series):

                        res = res.iloc[0]

                    res = res[col_name]

                except (KeyError, TypeError):

                    res = np.nan

                results.append(res)

            self[new_name] = results

            return self[new_name]
time_level
def time_level(
    self,
    col
)
View Source
    def time_level(self, col):

        if (col.dt.microsecond != 0).any():

            return 4

        if (col.dt.second != 0).any():

            return 3

        if (col.dt.minute != 0).any():

            return 2

        if (col.dt.hour != 0).any():

            return 1

        return 0
to_html
def to_html(
    self
)
View Source
    def to_html(self):

        if self.selected_columns:

            data = pd.DataFrame({x: getattr(self, x) for x in self.selected_columns})

            return data.to_html()

        return super().to_html()
to_place
def to_place(
    self
)
View Source
    def to_place(self):

        results = []

        places = get_type_from_registry("places")

        for time in self.time:

            try:

                results.append(places.iloc[places.index.get_loc(time)].iloc[0])

            except (TypeError, KeyError):

                pass

        return places.__class__(results)
view
def view(
    self,
    index
)
View Source
    def view(self, index):

        view(self.path[index])
when
def when(
    self,
    other,
    **window_kwargs
)
View Source
    def when_at(self, other, **window_kwargs):

        if isinstance(other, str):

            other = get_type_from_registry("places").containing(other)

        return self.__class__(join_time(other, self, **window_kwargs))
when_at
def when_at(
    self,
    other,
    **window_kwargs
)
View Source
    def when_at(self, other, **window_kwargs):

        if isinstance(other, str):

            other = get_type_from_registry("places").containing(other)

        return self.__class__(join_time(other, self, **window_kwargs))