Quick Start¶
Get started with Graphene:
Note: Code examples below are given in Python.
Authentication¶
Graphene User Account Credentials¶
Graphene user account credentials are provided by Analyze Re for accessing Graphene. They are in the form of an Amazon Cognito user and are unique per-user.
The Graphene user account credentials consist of 5 parts: username, password, user_pool_id, client_id, pool_region.
user_pool_id: id of a cognito user
pool,
e.g. us-east-1_fpijivb8z
client_id: id of a client
app
in the cognito user pool, e.g. 1ameln1vm234htpq11238rmpol
pool_region:
region
of the cognito user pool, e.g. us-east-1
S3 Authentication¶
S3 authentication is a two-step process. An authorization token ID token is first obtained by using the Graphene user account credentials. That token is then used to obtain temporary S3 access credentials. These credentials expire in one hour, but can be refreshed indefinitely.
The temporary S3 access credentials includes: aws_access_key_id
,
aws_secret_access_key
, and aws_session_token
.
Note: Graphene use two types of authorization token: ID token and access token.
ID token can be used to get hostname, bucket, and temporary S3 access credentials, see example below.
access token can be used to connect Graphene gRPC services, see example in Graphene Authentication
Authenticating Using Cognito to Access an S3 Bucket¶
Modify the Graphene user account credentials in this example to match your own.
import boto3
from warrant_lite import WarrantLite
from jose import jwt
###########################################################################################
# Modify the following fields of the Graphene user account credentials to your credentials:
###########################################################################################
graphene_creds = {
"region": "some_region",
"user_pool_id": "some_user_pool_id",
"client_id": "some_client_id",
"username": "some_username",
"password": "some_password"
}
###########################################################################################
cognito = WarrantLite(
username=graphene_creds["username"],
password=graphene_creds["password"],
pool_id=graphene_creds["user_pool_id"],
client_id=graphene_creds["client_id"],
client=boto3.client(
"cognito-idp", region_name=graphene_creds["region"], aws_access_key_id="", aws_secret_access_key=""
),
)
tokens = cognito.authenticate_user()
# access token can be used to connect Graphene gRPC services.
access_token = tokens["AuthenticationResult"]["AccessToken"]
# id token can be used to get hostname, bucket, and temporary S3 access credentials
id_token = tokens["AuthenticationResult"]["IdToken"]
# the hostname is the Graphene gRPC serive hostname
hostname = jwt.get_unverified_claims(id_token)["custom:hostname"]
# the bucket name of s3 bucket that is associated with the cognito user
bucket_name = jwt.get_unverified_claims(id_token)["custom:bucket"]
identity_pool_id = jwt.get_unverified_claims(id_token)["custom:identity_pool_id"]
cognito_identity_client = boto3.client(
"cognito-identity", region_name=graphene_creds["region"], aws_access_key_id="", aws_secret_access_key=""
)
provider_name = f'cognito-idp.{graphene_creds["region"]}.amazonaws.com/{graphene_creds["user_pool_id"]}'
identity_id = cognito_identity_client.get_id(IdentityPoolId=identity_pool_id, Logins={provider_name: id_token})
aws_creds = cognito_identity_client.get_credentials_for_identity(
IdentityId=identity_id["IdentityId"], Logins={provider_name: id_token}
)
s3_resource = boto3.resource(
"s3",
aws_access_key_id=aws_creds["Credentials"]["AccessKeyId"],
aws_secret_access_key=aws_creds["Credentials"]["SecretKey"],
aws_session_token=aws_creds["Credentials"]["SessionToken"],
)
print(f"Bucket keys: {[ obj.key for obj in s3_resource.Bucket(bucket_name).objects.all()]}")
S3 Uploads¶
First, a couple relevant S3 terms and definitions:
A bucket is a container for storing files.
A key is a string which uniquely identifies a file in an S3 bucket.
A key prefix is an initial substring of a key which may be common among multiple keys, implicitly grouping them into a collection. (The key prefix may be thought of as a directory on a filesystem.)
Each Graphene user account gets a unique, isolated bucket. The bucket
name can be obtained through the above
example.
Input parquet files must be uploaded with a key having at minimum a
key prefix of uploads/ledgers/
. Longer prefixes may be used to
further group files. Check Ledger Format for details
of Graphene input data format.
Note: Once files are uploaded under a given key, they must not be
modified or deleted at that key. To upload modified data, users must
upload to a new key (again with a minimum key prefix of
uploads/ledgers/
.).
Construct a business model¶
Construct a basic business model using the Graphene Data API.
Graphene authentication¶
The access token which is obtained during authenticating with S3 (see example) can be provided directly to the Graphene gRPC request without any further steps. It is provided with every request to the Graphene Data API or the Graphene Analysis API.
Create a network as a business model¶
This example will show you how to make a network with a loss set node as
a business model using Graphene Data API. Check
data model for details of business
model. A loss set node allows Graphene to access input data. Graphene
input data can either be a single parquet file or a collection of
parquet files. So the Graphene input data path can either be a full s3
path that ends with .parquet
(e.g. s3://<bucket_name>/uploads/ledgers/<...a_single_file_name>.parquet
),
or a a full s3 path that ends with a slash /
delimiter
(e.g. s3://<bucket_name>/uploads/ledgers/<...some_prefix_name>/
)
This example continues using bucket_name
, hostname
,
access_token
from the above
example. The
hostname
and access_token
are used to create a gRPC
channel
which provides connection to the Graphene Data API
or the Graphene Analysis API.
You will need to modify the input_data_path
to be the path of your
uploaded data from s3 upload:
import json
import grpc
from proto.public import data_management_api_pb2, data_management_api_pb2_grpc
#######################################################################
# Modify the s3 path below to match where your data have been uploaded:
#######################################################################
input_data_path = f"s3://{bucket_name}/<key>"
#######################################################################
def grpc_keepalive_options(keepalive_seconds):
options = [
("grpc.keepalive_time_ms", keepalive_seconds * 1000),
("grpc.keepalive_permit_without_calls", 1),
("grpc.http2.max_pings_without_data", 0),
]
# the default minimum is 5 minutes, so if this less, we also need to set the minimum
grpc_default_minimum_s = 5 * 60
if keepalive_seconds < grpc_default_minimum_s:
options.append(("grpc.http2.min_time_between_pings_ms", keepalive_seconds * 1000))
return options
def get_channel_config(access_token):
channel_creds = grpc.ssl_channel_credentials()
call_creds = grpc.access_token_call_credentials(access_token)
creds = grpc.composite_channel_credentials(channel_creds, call_creds)
channel_args = [creds, grpc_keepalive_options(60)]
return channel_args
def create_channel(access_token, hostname):
channel_opts = get_channel_config(access_token)
return grpc.secure_channel("{}:{}".format(hostname, 443), *channel_opts)
# reuse the access_token and hostname to create a channel
channel = create_channel(access_token, hostname)
# connect to Data API
data_management_stub = data_management_api_pb2_grpc.DataManagementStub(channel)
try:
# create a loss set node with specified input data path
loss_set_properties = {
"_schema": "LossSet_1.0",
"path": input_data_path,
"occurrence_key_column": "EventId",
"label": f"LossSet[v1]",
}
# Enable gRPC-level errors
node_ref = data_management_stub.CreateNode(
data_management_api_pb2.CreateNodeRequest(
properties=data_management_api_pb2.Properties(
data=json.dumps(loss_set_properties).encode(), encoding=data_management_api_pb2.Properties.JSON
)
),
).reference
print(f"Loss set node created: {node_ref}")
# create a network consisting of just the singular node
network_ref = data_management_stub.CreateNetwork(
data_management_api_pb2.CreateNetworkRequest(node_references=[node_ref],
),
).reference
print(f"Network created: {network_ref}")
except grpc.RpcError as ex:
print(f"Data API request failed: {ex.code()}: {ex.details()}")
finally:
channel.close()
Request an analysis and retrieve the results¶
Request an analysis on the business model created in above
example using the Graphene
Analysis API. This example continues using
hostname
and access_token
from the
example to
create a gRPC channel to connect to the Graphene Analysis
API.
import grpc
from proto.public import (
analysis_api_pb2, analysis_api_pb2_grpc, common_pb2
)
# reuse the access_token and hostname to create a channel
channel = create_channel(access_token, hostname)
# connect to Analysis API
analysis_api_stub = analysis_api_pb2_grpc.AnalysisStub(channel)
try:
# execute an analysis request with distribution metrics.
results = analysis_api_stub.RunAnalysis(
analysis_api_pb2.NetworkAnalysisStreamRequest(
trial_range=common_pb2.TrialRange(trial_begin=1, trial_count=5),
network_analysis_requests=[
analysis_api_pb2.NetworkAnalysisRequest(
reference=network_ref,
context=analysis_api_pb2.Context(template_search_paths=["financial_model"]),
distribution_metrics=[
analysis_pb2.DistributionMetricsDescriptor(
aggregation=analysis_pb2.AggregationDescriptor(aggregation_method="AEP"),
windows=[analysis_pb2.ProbabilityWindow(min_probability=0.0, max_probability=1.0)],
)
],
)
],
),
)
# get analysis request id from gRPC header.
request_id = dict(results.initial_metadata()).get("x-graphene-request-id", "UNDEFINED")
print(f"Analysis request id: {request_id}")
# each result from the stream results can be one of header, error or result.
for result in results:
if result.WhichOneof("data") == "header":
print(f"Received header: {result.header}")
elif result.WhichOneof("data") == "result":
print(f"Analysis API: received result: {result.result}")
except grpc.RpcError as ex:
print(f"Analysis API: request failed: {ex.code()}: {ex.details()}")
finally:
channel.close()
Request an analysis and retrieve the results from S3 asynchronously¶
Request an asynchronous analysis on the business model created in above
example using the Graphene
Analysis API. This example continues using
hostname
and access_token
from the
example to
create a gRPC channel to connect to the Graphene Analysis API.
import grpc
import boto3
from urllib.parse import urlparse
from google.rpc import status_pb2
from google.protobuf import json_format
from proto.public import (
analysis_api_pb2, analysis_api_pb2_grpc, common_pb2
)
# reuse the access_token and hostname to create a channel
channel = create_channel(access_token, hostname)
# connect to Analysis API
analysis_api_stub = analysis_api_pb2_grpc.AnalysisStub(channel)
try:
# execute an analysis request with distribution metrics.
reply, call = analysis_api_stub.StartAsyncAnalysis.with_call(
analysis_api_pb2.AsyncAnalysisRequest(
trial_range=common_pb2.TrialRange(trial_begin=1, trial_count=5),
network_analysis_requests=[
analysis_api_pb2.NetworkAnalysisRequest(
reference=network_ref,
context=analysis_api_pb2.Context(template_search_paths=["financial_model"]),
distribution_metrics=[
analysis_pb2.DistributionMetricsDescriptor(
aggregation=analysis_pb2.AggregationDescriptor(aggregation_method="AEP"),
windows=[analysis_pb2.ProbabilityWindow(min_probability=0.0, max_probability=1.0)],
)
],
)
],
),
)
# get analysis results s3 location.
results_uri = reply.results_uri
parsed = urlparse(results_uri)
bucket_name = parsed.netloc
key = parsed.path.lstrip('/')
print(f"Analysis results bucket: {bucket_name}, key: {key}")
# read request status from s3. repeat this until status is COMPLETE or ERROR.
status_key = f"{key}analysis_status.json"
s3 = boto3.resource("s3")
s3_object = s3.Object(bucket_name, status_key).get()
status_message = json_format.Parse(s3_object["Body"].read(), analysis_api_pb2.AsyncNetworkAnalysisReply())
if status_message.status.code == common_pb2.Status.Code.ERROR:
# read error details from s3.
error_key = f"{key}error_details"
s3_object = s3.Object(bucket_name, key).get()
error = status_pb2.Status()
error.ParseFromString(s3_object["Body"].read())
print(f"Analysis API: async request processing failed: {error}")
elif status_message.status.code == common_pb2.Status.Code.COMPLETE:
bucket = s3.Bucket(bucket_name)
results_key = f"{key}analysis_result_"
results = list(bucket.objects.filer(Prefix=results_key))
for s3_object in results:
message = analysis_api_pb2.NetworkAnalysisStreamReply()
message.ParseFromString(s3_object["Body"].read())
if message.HasField("result"):
print(f"Analysis API: received result: {message.result}")
except grpc.RpcError as ex:
print(f"Analysis API: async request failed: {ex.code()}: {ex.details()}")
finally:
channel.close()