.. index:: Quick Start
Quick Start
===========
Get started with Graphene:
1. `Authentication <#authentication>`__
2. `S3 Uploads <#s3-uploads>`__
3. `Construct a business model <#construct-a-business-model>`__
4. `Request an
analysis <#request-an-analysis-and-retrieve-the-results>`__
5. `Request an async analysis <#request-an-analysis-and-retrieve-the-results-from-s3-asynchronously>`__
**Note**: Code examples below are given in **Python**.
.. _authentication:
Authentication
--------------
Graphene User Account Credentials
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
**Graphene user account credentials** are provided by Analyze Re for
accessing Graphene. They are in the form of an `Amazon
Cognito `__ user and are unique
per-user.
The Graphene user account credentials consist of 5 parts: **username**,
**password**, **user_pool_id**, **client_id**, **pool_region**.
**user_pool_id:** id of a `cognito user
pool `__,
e.g. ``us-east-1_fpijivb8z``
**client_id:** id of a `client
app `__
in the cognito user pool, e.g. ``1ameln1vm234htpq11238rmpol``
**pool_region:**
`region `__
of the cognito user pool, e.g. ``us-east-1``
S3 Authentication
~~~~~~~~~~~~~~~~~
S3 authentication is a two-step process. An authorization token `ID
token `__
is first obtained by using the **Graphene user account credentials**.
That token is then used to obtain **temporary S3 access credentials**.
These credentials expire in one hour, but can be refreshed indefinitely.
The **temporary S3 access credentials** includes: ``aws_access_key_id``,
``aws_secret_access_key``, and ``aws_session_token``.
**Note:** Graphene use two types of authorization token: **ID token**
and **access token**.
* **ID token** can be used to get hostname, bucket, and temporary S3 access credentials,
see `example <#authenticating-using-cognito-to-access-an-s3-bucket>`__ below.
* **access token** can be used to connect Graphene gRPC services,
see example in `Graphene Authentication <#graphene-authentication>`__
Authenticating Using Cognito to Access an S3 Bucket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Modify the Graphene user account credentials in this example to match
your own.
.. code:: python
import boto3
from warrant_lite import WarrantLite
from jose import jwt
###########################################################################################
# Modify the following fields of the Graphene user account credentials to your credentials:
###########################################################################################
graphene_creds = {
"region": "some_region",
"user_pool_id": "some_user_pool_id",
"client_id": "some_client_id",
"username": "some_username",
"password": "some_password"
}
###########################################################################################
cognito = WarrantLite(
username=graphene_creds["username"],
password=graphene_creds["password"],
pool_id=graphene_creds["user_pool_id"],
client_id=graphene_creds["client_id"],
client=boto3.client(
"cognito-idp", region_name=graphene_creds["region"], aws_access_key_id="", aws_secret_access_key=""
),
)
tokens = cognito.authenticate_user()
# access token can be used to connect Graphene gRPC services.
access_token = tokens["AuthenticationResult"]["AccessToken"]
# id token can be used to get hostname, bucket, and temporary S3 access credentials
id_token = tokens["AuthenticationResult"]["IdToken"]
# the hostname is the Graphene gRPC serive hostname
hostname = jwt.get_unverified_claims(id_token)["custom:hostname"]
# the bucket name of s3 bucket that is associated with the cognito user
bucket_name = jwt.get_unverified_claims(id_token)["custom:bucket"]
identity_pool_id = jwt.get_unverified_claims(id_token)["custom:identity_pool_id"]
cognito_identity_client = boto3.client(
"cognito-identity", region_name=graphene_creds["region"], aws_access_key_id="", aws_secret_access_key=""
)
provider_name = f'cognito-idp.{graphene_creds["region"]}.amazonaws.com/{graphene_creds["user_pool_id"]}'
identity_id = cognito_identity_client.get_id(IdentityPoolId=identity_pool_id, Logins={provider_name: id_token})
aws_creds = cognito_identity_client.get_credentials_for_identity(
IdentityId=identity_id["IdentityId"], Logins={provider_name: id_token}
)
s3_resource = boto3.resource(
"s3",
aws_access_key_id=aws_creds["Credentials"]["AccessKeyId"],
aws_secret_access_key=aws_creds["Credentials"]["SecretKey"],
aws_session_token=aws_creds["Credentials"]["SessionToken"],
)
print(f"Bucket keys: {[ obj.key for obj in s3_resource.Bucket(bucket_name).objects.all()]}")
S3 Uploads
----------
First, a couple relevant S3 terms and definitions:
1. A `bucket `__
is a container for storing files.
2. A `key `__
is a string which uniquely identifies a file in an S3 bucket.
3. A `key prefix `__
is an initial substring of a key which may be common among multiple
keys, implicitly grouping them into a collection. (The **key prefix**
may be thought of as a **directory** on a filesystem.)
Each Graphene user account gets a unique, isolated bucket. The bucket
name can be obtained through the above
`example <#authenticating-using-cognito-to-access-an-s3-bucket>`__.
Input parquet files must be uploaded with a key having **at minimum** a
key prefix of ``uploads/ledgers/``. Longer prefixes may be used to
further group files. Check :doc:`Ledger Format ` for details
of Graphene input data format.
**Note:** Once files are uploaded under a given key, **they must not be
modified or deleted at that key.** To upload modified data, users must
upload to a new key (again with a minimum key prefix of
``uploads/ledgers/``.).
Construct a business model
--------------------------
Construct a basic business model using the :doc:`Graphene Data
API `.
Graphene authentication
~~~~~~~~~~~~~~~~~~~~~~~
The `access token `__
which is obtained during authenticating with S3 (see
`example <#authenticating-using-cognito-to-access-an-s3-bucket>`__) can
be provided directly to the Graphene gRPC request without any further
steps. It is provided with every request to the :doc:`Graphene Data
API ` or the :doc:`Graphene Analysis
API `.
Create a network as a business model
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This example will show you how to make a network with a loss set node as
a business model using :doc:`Graphene Data API `. Check
`data model `__ for details of business
model. A loss set node allows Graphene to access input data. Graphene
input data can either be a single parquet file or a collection of
parquet files. So the Graphene input data path can either be a full s3
path that ends with ``.parquet``
(e.g. ``s3:///uploads/ledgers/<...a_single_file_name>.parquet``),
or a a full s3 path that ends with a slash ``/`` delimiter
(e.g. ``s3:///uploads/ledgers/<...some_prefix_name>/``)
This example continues using ``bucket_name``, ``hostname``,
``access_token`` from the above
`example <#authenticating-using-cognito-to-access-an-s3-bucket>`__. The
``hostname`` and ``access_token`` are used to create a `gRPC
channel `__
which provides connection to the :doc:`Graphene Data API `
or the :doc:`Graphene Analysis API `.
You will need to modify the ``input_data_path`` to be the path of your
uploaded data from `s3 upload <#s3-uploads>`__:
.. code:: python
import json
import grpc
from proto.public import data_management_api_pb2, data_management_api_pb2_grpc
#######################################################################
# Modify the s3 path below to match where your data have been uploaded:
#######################################################################
input_data_path = f"s3://{bucket_name}/"
#######################################################################
def grpc_keepalive_options(keepalive_seconds):
options = [
("grpc.keepalive_time_ms", keepalive_seconds * 1000),
("grpc.keepalive_permit_without_calls", 1),
("grpc.http2.max_pings_without_data", 0),
]
# the default minimum is 5 minutes, so if this less, we also need to set the minimum
grpc_default_minimum_s = 5 * 60
if keepalive_seconds < grpc_default_minimum_s:
options.append(("grpc.http2.min_time_between_pings_ms", keepalive_seconds * 1000))
return options
def get_channel_config(access_token):
channel_creds = grpc.ssl_channel_credentials()
call_creds = grpc.access_token_call_credentials(access_token)
creds = grpc.composite_channel_credentials(channel_creds, call_creds)
channel_args = [creds, grpc_keepalive_options(60)]
return channel_args
def create_channel(access_token, hostname):
channel_opts = get_channel_config(access_token)
return grpc.secure_channel("{}:{}".format(hostname, 443), *channel_opts)
# reuse the access_token and hostname to create a channel
channel = create_channel(access_token, hostname)
# connect to Data API
data_management_stub = data_management_api_pb2_grpc.DataManagementStub(channel)
try:
# create a loss set node with specified input data path
loss_set_properties = {
"_schema": "LossSet_1.0",
"path": input_data_path,
"occurrence_key_column": "EventId",
"label": f"LossSet[v1]",
}
# Enable gRPC-level errors
node_ref = data_management_stub.CreateNode(
data_management_api_pb2.CreateNodeRequest(
properties=data_management_api_pb2.Properties(
data=json.dumps(loss_set_properties).encode(), encoding=data_management_api_pb2.Properties.JSON
)
),
).reference
print(f"Loss set node created: {node_ref}")
# create a network consisting of just the singular node
network_ref = data_management_stub.CreateNetwork(
data_management_api_pb2.CreateNetworkRequest(node_references=[node_ref],
),
).reference
print(f"Network created: {network_ref}")
except grpc.RpcError as ex:
print(f"Data API request failed: {ex.code()}: {ex.details()}")
finally:
channel.close()
.. _request-analysis-example:
Request an analysis and retrieve the results
--------------------------------------------
Request an analysis on the business model created in above
`example <#create-a-network-as-a-business-model>`__ using the :doc:`Graphene
Analysis API `. This example continues using
``hostname`` and ``access_token`` from the
`example <#authenticating-using-cognito-to-access-an-s3-bucket>`__ to
create a gRPC channel to connect to the :doc:`Graphene Analysis
API `.
.. code:: python
import grpc
from proto.public import (
analysis_api_pb2, analysis_api_pb2_grpc, common_pb2
)
# reuse the access_token and hostname to create a channel
channel = create_channel(access_token, hostname)
# connect to Analysis API
analysis_api_stub = analysis_api_pb2_grpc.AnalysisStub(channel)
try:
# execute an analysis request with distribution metrics.
results = analysis_api_stub.RunAnalysis(
analysis_api_pb2.NetworkAnalysisStreamRequest(
trial_range=common_pb2.TrialRange(trial_begin=1, trial_count=5),
network_analysis_requests=[
analysis_api_pb2.NetworkAnalysisRequest(
reference=network_ref,
context=analysis_api_pb2.Context(template_search_paths=["financial_model"]),
distribution_metrics=[
analysis_pb2.DistributionMetricsDescriptor(
aggregation=analysis_pb2.AggregationDescriptor(aggregation_method="AEP"),
windows=[analysis_pb2.ProbabilityWindow(min_probability=0.0, max_probability=1.0)],
)
],
)
],
),
)
# get analysis request id from gRPC header.
request_id = dict(results.initial_metadata()).get("x-graphene-request-id", "UNDEFINED")
print(f"Analysis request id: {request_id}")
# each result from the stream results can be one of header, error or result.
for result in results:
if result.WhichOneof("data") == "header":
print(f"Received header: {result.header}")
elif result.WhichOneof("data") == "result":
print(f"Analysis API: received result: {result.result}")
except grpc.RpcError as ex:
print(f"Analysis API: request failed: {ex.code()}: {ex.details()}")
finally:
channel.close()
Request an analysis and retrieve the results from S3 asynchronously
-------------------------------------------------------------------
Request an asynchronous analysis on the business model created in above
`example <#create-a-network-as-a-business-model>`__ using the :doc:`Graphene
Analysis API `. This example continues using
``hostname`` and ``access_token`` from the
`example <#authenticating-using-cognito-to-access-an-s3-bucket>`__ to
create a gRPC channel to connect to the :doc:`Graphene Analysis API
`.
.. code:: python
import grpc
import boto3
from urllib.parse import urlparse
from google.rpc import status_pb2
from google.protobuf import json_format
from proto.public import (
analysis_api_pb2, analysis_api_pb2_grpc, common_pb2
)
# reuse the access_token and hostname to create a channel
channel = create_channel(access_token, hostname)
# connect to Analysis API
analysis_api_stub = analysis_api_pb2_grpc.AnalysisStub(channel)
try:
# execute an analysis request with distribution metrics.
reply, call = analysis_api_stub.StartAsyncAnalysis.with_call(
analysis_api_pb2.AsyncAnalysisRequest(
trial_range=common_pb2.TrialRange(trial_begin=1, trial_count=5),
network_analysis_requests=[
analysis_api_pb2.NetworkAnalysisRequest(
reference=network_ref,
context=analysis_api_pb2.Context(template_search_paths=["financial_model"]),
distribution_metrics=[
analysis_pb2.DistributionMetricsDescriptor(
aggregation=analysis_pb2.AggregationDescriptor(aggregation_method="AEP"),
windows=[analysis_pb2.ProbabilityWindow(min_probability=0.0, max_probability=1.0)],
)
],
)
],
),
)
# get analysis results s3 location.
results_uri = reply.results_uri
parsed = urlparse(results_uri)
bucket_name = parsed.netloc
key = parsed.path.lstrip('/')
print(f"Analysis results bucket: {bucket_name}, key: {key}")
# read request status from s3. repeat this until status is COMPLETE or ERROR.
status_key = f"{key}analysis_status.json"
s3 = boto3.resource("s3")
s3_object = s3.Object(bucket_name, status_key).get()
status_message = json_format.Parse(s3_object["Body"].read(), analysis_api_pb2.AsyncNetworkAnalysisReply())
if status_message.status.code == common_pb2.Status.Code.ERROR:
# read error details from s3.
error_key = f"{key}error_details"
s3_object = s3.Object(bucket_name, key).get()
error = status_pb2.Status()
error.ParseFromString(s3_object["Body"].read())
print(f"Analysis API: async request processing failed: {error}")
elif status_message.status.code == common_pb2.Status.Code.COMPLETE:
bucket = s3.Bucket(bucket_name)
results_key = f"{key}analysis_result_"
results = list(bucket.objects.filer(Prefix=results_key))
for s3_object in results:
message = analysis_api_pb2.NetworkAnalysisStreamReply()
message.ParseFromString(s3_object["Body"].read())
if message.HasField("result"):
print(f"Analysis API: received result: {message.result}")
except grpc.RpcError as ex:
print(f"Analysis API: async request failed: {ex.code()}: {ex.details()}")
finally:
channel.close()