Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exabel/client/api/data_classes/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ def __eq__(self, other: object) -> bool:
def __repr__(self) -> str:
return (
f"Folder(name='{self.name}', display_name='{self.display_name}', "
f"write={self.write}, items={self.items}, description={repr(self.description)})"
f"write={self.write}, items={self.items}, description={self.description!r})"
)
5 changes: 2 additions & 3 deletions exabel/client/api/data_classes/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ def _series_to_time_series_points(series: pd.Series) -> Sequence[TimeSeriesPoint
"""Convert a pandas Series to a sequence of TimeSeriesPoint."""
points = []
for index, value in series.items():
# Logic from _pandas_timestamp_to_proto has been inlined here for performance.
point = TimeSeriesPoint()
if isinstance(index, tuple):
# (timestamp, known_time)
Expand All @@ -185,12 +184,12 @@ def _series_to_time_series_points(series: pd.Series) -> Sequence[TimeSeriesPoint
f"two elements: (timestamp, known_time), but got {index}"
)
timestamp = index[0]
point.known_time.seconds = index[1].value // 1_000_000_000
point.known_time.seconds = int(index[1].timestamp())
else:
timestamp = index
if timestamp is pd.NaT:
raise ValueError("Timestamp must be set")
point.time.seconds = timestamp.value // 1_000_000_000
point.time.seconds = int(timestamp.timestamp())
point.value.value = value
points.append(point)
return points
Expand Down
308 changes: 307 additions & 1 deletion exabel/client/api/export_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import pickle
from time import time
Expand Down Expand Up @@ -105,6 +106,311 @@ def _to_column(item: str | Column | DerivedSignal) -> str | Column:
return Column(name=item.label, expression=item.expression)
return item # str or Column pass through unchanged

@staticmethod
def _to_timestamp_string(value: str | pd.Timestamp) -> str:
"""Convert a timestamp value to an ISO 8601 string suitable for the v2 JSON request."""
if isinstance(value, pd.Timestamp):
if value.tzinfo is None:
return value.isoformat() + "Z"
return value.tz_convert("UTC").strftime("%Y-%m-%dT%H:%M:%SZ")
return value if "T" in value else value + "T00:00:00Z"

@staticmethod
def _build_v2_signals(
signal: str | Column | DerivedSignal | Sequence[str | Column | DerivedSignal],
) -> list[dict[str, str]]:
"""Convert signal arguments to the v2 JSON format."""
items = [signal] if isinstance(signal, (str, Column, DerivedSignal)) else signal
result: list[dict[str, str]] = []
for item in items:
if isinstance(item, DerivedSignal):
if not item.label or not item.expression:
raise ValueError(
f"DerivedSignal must have both label and expression, "
f"got label={item.label!r}, expression={item.expression!r}"
)
result.append({"label": item.label, "expression": item.expression})
elif isinstance(item, Column):
entry: dict[str, str] = {"label": item.name}
if item.expression:
entry["expression"] = item.expression
result.append(entry)
else:
result.append({"label": item})
return result

def _post_v2_signals(
self,
signals: list[dict[str, str]],
*,
entities: list[str] | None = None,
tags: list[str] | None = None,
start_time: str | pd.Timestamp | None = None,
end_time: str | pd.Timestamp | None = None,
version: str | pd.Timestamp | None = None,
output_format: str = "pickle",
) -> bytes:
"""POST a structured JSON request to the v2 export signals endpoint.

Returns the raw response bytes in the requested format.
"""
body: dict = {
"signals": signals,
"outputFormat": output_format,
}
if entities:
body["entities"] = entities
if tags:
body["tags"] = tags
time_range: dict[str, str] = {}
if start_time:
time_range["from"] = self._to_timestamp_string(start_time)
if end_time:
time_range["to"] = self._to_timestamp_string(end_time)
if time_range:
body["timeRange"] = time_range
if version is not None:
body["version"] = self._to_timestamp_string(version)

url = f"https://{self._backend}/v2/export/signals"
start = time()
logger.info("Sending v2 signal query: %s", body)
response = self._session.post(
url, data=json.dumps(body), headers={"Content-Type": "application/json"}, timeout=600
)
spent_time = time() - start
logger.info(
"v2 query completed in %.1f seconds, received %d bytes, status %d",
spent_time,
len(response.content),
response.status_code,
)
if response.status_code == 200:
return response.content
error_message = response.content.decode()
if error_message.startswith('"') and error_message.endswith('"'):
error_message = error_message[1:-1]
raise ValueError(f"Got {response.status_code}: {error_message}")

def run_signal_query_v2(
self,
signals: list[dict[str, str]],
*,
entities: list[str] | None = None,
tags: list[str] | None = None,
start_time: str | pd.Timestamp | None = None,
end_time: str | pd.Timestamp | None = None,
version: str | pd.Timestamp | None = None,
) -> pd.DataFrame:
"""Run a v2 signal export query, returning the unprocessed server response as a DataFrame.

Use this when you need the full multi-level column structure from the server, for
example to access Bloomberg tickers or time series labels embedded in the column
headers. For a simpler interface that returns flat columns, use signal_query_v2().

The returned DataFrame has:
- A RangeIndex (integer rows).
- The first column contains timestamps (column header is a tuple of level names).
- Remaining columns are a MultiIndex with levels
(Signal, Entity, Bloomberg ticker, Time series), where each column represents
one (signal, entity, time series) combination.

Example::

df = export_api.run_signal_query_v2(
signals=[
{"label": "Popularity", "expression": "graph_signal('ns.popularity')"},
{"label": "Revenue"},
],
entities=["entityTypes/company/entities/F_000C7F-E"],
start_time="2024-01-01",
end_time="2024-12-31",
)

Args:
signals: list of signal specifications, each a dict with 'label' and optionally
'expression'. If only 'label' is provided, it must refer to a derived
signal in the library. If 'expression' is provided, it is evaluated as
a DSL expression and 'label' is used as the column header.
entities: entity resource names
(e.g., ["entityTypes/company/entities/F_000C7F-E"]).
tags: tag resource names (e.g., ["tags/user:123"]).
start_time: the start of the time range.
end_time: the end of the time range.
version: point-in-time version at which to evaluate the signals.
"""
content = self._post_v2_signals(
signals,
entities=entities,
tags=tags,
start_time=start_time,
end_time=end_time,
version=version,
output_format="pickle",
)
result_df = pickle.loads(content)
assert isinstance(result_df, pd.DataFrame)
return result_df

@staticmethod
def _reshape_v2_response(
df: pd.DataFrame,
multi_entity: bool,
) -> pd.DataFrame:
"""Reshape a v2 response DataFrame to the signal_query format.

The v2 response has MultiIndex columns (Signal, Entity, Bloomberg ticker, Time series)
with the first column containing timestamps and a RangeIndex for rows.
"""
time_values = pd.DatetimeIndex(df.iloc[:, 0])
data_df = df.iloc[:, 1:]

if not isinstance(data_df.columns, pd.MultiIndex):
data_df = data_df.copy()
data_df.index = time_values
data_df.index.name = "time"
return data_df

signal_labels = data_df.columns.get_level_values(0)
has_entity_level = data_df.columns.nlevels >= 4

if has_entity_level and multi_entity:
entity_names = data_df.columns.get_level_values(1)
unique_entities = list(dict.fromkeys(entity_names))
unique_signals = list(dict.fromkeys(signal_labels))
ts_level = data_df.columns.nlevels - 1

pieces = []
for entity in unique_entities:
entity_mask = entity_names == entity
entity_data = data_df.loc[:, entity_mask]
entity_signal_labels = signal_labels[entity_mask]

entity_df = pd.DataFrame(index=time_values)
for sig in unique_signals:
sig_mask = entity_signal_labels == sig
sig_cols = entity_data.loc[:, sig_mask]
if sig_cols.shape[1] == 0:
continue
if sig_cols.shape[1] == 1:
entity_df[sig] = sig_cols.iloc[:, 0].values
else:
ts_names = data_df.columns.get_level_values(ts_level)[entity_mask][sig_mask]
for i, ts_name in enumerate(ts_names):
entity_df[f"{sig}/{ts_name}"] = sig_cols.iloc[:, i].values

entity_df["__entity__"] = entity
pieces.append(entity_df)

result = pd.concat(pieces)
result.index.name = "time"
result = result.reset_index().set_index(["__entity__", "time"])
result.index.names = ["name", "time"]
return result

# Single entity or no entity level
result = pd.DataFrame(index=time_values)
result.index.name = "time"
unique_signals = list(dict.fromkeys(signal_labels))
ts_level = data_df.columns.nlevels - 1

for sig in unique_signals:
sig_mask = signal_labels == sig
sig_cols = data_df.loc[:, sig_mask]
if sig_cols.shape[1] == 1:
result[sig] = sig_cols.iloc[:, 0].values
else:
ts_names = data_df.columns.get_level_values(ts_level)[sig_mask]
for i, ts_name in enumerate(ts_names):
result[f"{sig}/{ts_name}"] = sig_cols.iloc[:, i].values

return result

def signal_query_v2(
self,
signal: str | Column | DerivedSignal | Sequence[str | Column | DerivedSignal],
*,
resource_name: str | Sequence[str] | None = None,
tag: str | Sequence[str] | None = None,
start_time: str | pd.Timestamp | None = None,
end_time: str | pd.Timestamp | None = None,
version: str | pd.Timestamp | None = None,
) -> pd.Series | pd.DataFrame:
"""Run a query for one or more signals using the v2 export endpoint.

Unlike signal_query(), this method uses the v2 export API which supports
multi-timeseries signals (e.g., expressions using for_type() that return one time
series per sub-entity). Entities must be specified by resource_name or tag rather
than bloomberg_ticker or factset_id.

For multi-timeseries signals, each time series becomes a separate column named
``"{signal_label}/{time_series_name}"`` (e.g., ``"Visits/domain1.com"``).

For access to the full server response with multi-level column headers (including
Bloomberg tickers and entity names), use run_signal_query_v2() instead.

Example::

result = export_api.signal_query_v2(
DerivedSignal(
name=None,
label="sweb_visits",
expression="data('similarweb.all_visits').for_type('similarweb.domain')",
),
resource_name="entityTypes/company/entities/F_000C7F-E",
start_time="2024-01-01",
end_time="2024-12-31",
)

Args:
signal: the signal(s) to retrieve. A string is interpreted as a signal label
from the library. Column and DerivedSignal objects allow specifying
a DSL expression with a label. At least one signal must be requested.
resource_name: an Exabel resource name such as
"entityTypes/company/entities/F_000C7F-E", or a list of such names.
tag: an Exabel tag resource name such as "tags/user:123",
or a list of such names.
start_time: the first date to retrieve data for.
end_time: the last date to retrieve data for.
version: the point-in-time at which to evaluate the signals.

Returns:
A pandas Series if the result is a single time series,
or a pandas DataFrame if there are multiple time series in the result.
If a single entity was specified, the index is a DatetimeIndex.
If multiple entities or a tag was given, the index is a MultiIndex with
entity on the first level and time on the second level.
"""
if not signal:
raise ValueError("Must specify signal to retrieve")

v2_signals = self._build_v2_signals(signal)

entities: list[str] | None = None
tags: list[str] | None = None
multi_entity = False

if resource_name is not None:
entities = [resource_name] if isinstance(resource_name, str) else list(resource_name)
multi_entity = len(entities) > 1
if tag is not None:
tags = [tag] if isinstance(tag, str) else list(tag)
multi_entity = True

content = self._post_v2_signals(
v2_signals,
entities=entities,
tags=tags,
start_time=start_time,
end_time=end_time,
version=version,
output_format="pickle",
)
raw_df = pickle.loads(content)
assert isinstance(raw_df, pd.DataFrame)
df = self._reshape_v2_response(raw_df, multi_entity=multi_entity)
return df.squeeze(axis=1).infer_objects()

def signal_query(
self,
signal: str | Column | DerivedSignal | Sequence[str | Column | DerivedSignal],
Expand Down Expand Up @@ -220,7 +526,7 @@ def signal_query(
df = self.run_query(query.sql())

# Set the row index
df.set_index([col.name for col in index], inplace=True)
df = df.set_index([col.name for col in index])
# Squeeze to a Series if a single time series was returned,
# and fix the data type (the backend returns a DataFrame with dtype=object)
return df.squeeze(axis=1).infer_objects()
Expand Down
2 changes: 1 addition & 1 deletion exabel/client/api/time_series_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ def _pandas_timestamp_to_proto(
"""
if timestamp is None:
return None
return timestamp_pb2.Timestamp(seconds=timestamp.value // 1000000000)
return timestamp_pb2.Timestamp(seconds=int(timestamp.timestamp()))

@staticmethod
def _handle_time_series_response(
Expand Down
Empty file added exabel/py.typed
Empty file.
4 changes: 2 additions & 2 deletions exabel/scripts/check_company_identifiers_in_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
subset=[args.identifier_column]
)
identifier_type = self._get_identifier_type(args.identifier_column, args.identifier_type)
df.rename(columns={args.identifier_column: identifier_type}, inplace=True)
df = df.rename(columns={args.identifier_column: identifier_type})
identifiers = df[identifier_type].drop_duplicates()
entity_resource_names = to_entity_resource_names(
client.entity_api,
Expand All @@ -118,7 +118,7 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
"identifiers in total"
)
if args.output_file:
df.rename(columns={args.identifier_column: identifier_type}, inplace=True)
df = df.rename(columns={args.identifier_column: identifier_type})
result_df = df.merge(checked_data_frame, how="left", on=identifier_type)
result_df.to_csv(args.output_file, index=False)
print(f"Identifier mappings written to {args.output_file}")
Expand Down
Loading