import json
from copy import deepcopy
from itertools import chain
from .util import JsonEncoder
from .row import Row
from .util import resolve
from .util import to_dict
from .attrdict import AttrDict
class Collection:
def __init__(self, iterable):
self.iterable = iterable
def __iter__(self):
return iter(self.iterable)
def __repr__(self):
if hasattr(self, "iterable"):
return f"Collection({repr(self.iterable)})"
return super().__repr__()
def __add__(self, other):
return Collection(chain(self, other))
def to_list(self):
"""Return a list of item objects from the Collection"""
return ListCollection(self)
def to_records(self, item_class=dict, collection_class=list):
"""Return a list of dictionaries containing item data in ordinary Python types
Useful for ingesting into NoSQL databases
"""
return to_dict(self, item_class, collection_class)
def to_mongo(self):
"""Return a list of dictionaries containing MongoDB compatible datatypes
"""
return to_dict(self, dict, list, convert_dates=True)
def to_json(self, *args, **kwargs) -> str:
"""Convert objects to JSON format"""
return json.dumps(list(self.to_records()), *args, cls=JsonEncoder, **kwargs)
def to_pandas(self, annotate=list()):
"""Convert Collection into a Pandas DataFrame"""
import pandas as pd
list_of_series = list()
for i in iter(self):
try:
series = i.to_pandas()
except AttributeError:
series = pd.Series(i)
for a in annotate:
series[a] = resolve(i, a)
list_of_series.append(series)
return pd.DataFrame(list_of_series)
def explode(self, attribute, unpack=False, connector=".", prefix=True):
"""Implement an "explode" function for nested listed objects."""
if unpack:
return UnpackedCollection(ExplodedCollection(self, attribute), attribute, connector, prefix)
else:
return ExplodedCollection(self, attribute)
def unpack(self, attribute, connector=".", prefix=True):
"""Implement an "unpack" function for nested single objects"""
return UnpackedCollection(self, attribute, connector, prefix)
# Values
def values(self, *fields, **kw_fields):
"""Return a Collection that will return a Row object for each item with a subset of attributes
positional arguments will result in Row objects where the fields match the field names on the item,
keyword arguments can be used to rename attributes. When passed as key=field, the resulting dictionary will have key: item[field]
"""
return ValuesCollection(self, *fields, **kw_fields)
def values_list(self, *fields, flat=False, **kw_fields):
"""Return a Collection that will return tuples for each item with a subset of attributes.
If only a single field is passed, the keyword argument "flat" can be passed to return a simple list"""
return ValuesListCollection(self, *fields, flat=flat, **kw_fields)
[docs]class ListCollection(list, Collection):
def __getitem__(self, sl):
result = list(self)[sl]
if isinstance(sl, slice):
return ListCollection(result)
else:
return result
class ExplodedCollection(Collection):
def __init__(self, iterable, attribute):
self.iterable = iterable
self.attribute = attribute
def __iter__(self):
for row in self.iterable:
explode_field = resolve(row, self.attribute)
for item in explode_field:
new_row = row.to_dict()
new_row[self.attribute] = item
yield new_row
class UnpackedCollection(Collection):
def __init__(self, iterable, attribute, connector=".", prefix=True):
self.iterable = iterable
self.attribute = attribute
self.connector = connector
self.prefix = prefix
def item_key(self, k):
if not self.prefix:
return k
else:
return f"{self.attribute}{self.connector}{k}"
def __iter__(self):
for row in self.iterable:
unpack_field = {self.item_key(k): v for k, v in resolve(row, self.attribute).items()}
new_row = Row({**row, **unpack_field})
del new_row[self.attribute]
yield new_row
class ValuesCollection(Collection):
def __init__(self, Collection, *arg_fields, fields=dict(), **kw_fields):
self.Collection = Collection
self.fields = {**{k: k for k in arg_fields}, **kw_fields, **fields}
def __iter__(self):
for item in self.Collection:
yield AttrDict((k, resolve(item, v)) for k, v in self.fields.items())
def __getitem__(self, sl):
mger = deepcopy(self)
new_mgr = mger.Collection.__getitem__(sl)
return new_mgr
class ValuesListCollection(ValuesCollection):
def __init__(self, Collections, *fields, flat=False, **kw_fields):
super(ValuesListCollection, self).__init__(Collections, *fields, **kw_fields)
self.flat = flat
def __iter__(self):
if self.flat and len(self.fields) > 1:
raise ValueError("Flat only works with 1 field!")
for row in super(ValuesListCollection, self).__iter__():
data = tuple(row.values())
yield data[0] if self.flat else data