Quick Start

Get started with Graphene:

  1. Authentication

  2. S3 Uploads

  3. Construct a business model

  4. Request an analysis

  5. Request an async analysis

Note: Code examples below are given in Python.

Authentication

Graphene User Account Credentials

Graphene user account credentials are provided by Analyze Re for accessing Graphene. They are in the form of an Amazon Cognito user and are unique per-user.

The Graphene user account credentials consist of 5 parts: username, password, user_pool_id, client_id, pool_region.

user_pool_id: id of a cognito user pool, e.g. us-east-1_fpijivb8z

client_id: id of a client app in the cognito user pool, e.g. 1ameln1vm234htpq11238rmpol

pool_region: region of the cognito user pool, e.g. us-east-1

S3 Authentication

S3 authentication is a two-step process. An authorization token ID token is first obtained by using the Graphene user account credentials. That token is then used to obtain temporary S3 access credentials. These credentials expire in one hour, but can be refreshed indefinitely.

The temporary S3 access credentials includes: aws_access_key_id, aws_secret_access_key, and aws_session_token.

Note: Graphene use two types of authorization token: ID token and access token.

  • ID token can be used to get hostname, bucket, and temporary S3 access credentials, see example below.

  • access token can be used to connect Graphene gRPC services, see example in Graphene Authentication

Authenticating Using Cognito to Access an S3 Bucket

Modify the Graphene user account credentials in this example to match your own.

import boto3
from warrant_lite import WarrantLite
from jose import jwt

###########################################################################################
# Modify the following fields of the Graphene user account credentials to your credentials:
###########################################################################################

graphene_creds = {
    "region": "some_region",
    "user_pool_id": "some_user_pool_id",
    "client_id": "some_client_id",
    "username": "some_username",
    "password": "some_password"
}
###########################################################################################

cognito = WarrantLite(
    username=graphene_creds["username"],
    password=graphene_creds["password"],
    pool_id=graphene_creds["user_pool_id"],
    client_id=graphene_creds["client_id"],
    client=boto3.client(
        "cognito-idp", region_name=graphene_creds["region"], aws_access_key_id="", aws_secret_access_key=""
    ),
)
tokens = cognito.authenticate_user()
# access token can be used to connect Graphene gRPC services.
access_token = tokens["AuthenticationResult"]["AccessToken"]

# id token can be used to get hostname, bucket, and temporary S3 access credentials
id_token = tokens["AuthenticationResult"]["IdToken"]

# the hostname is the Graphene gRPC serive hostname
hostname = jwt.get_unverified_claims(id_token)["custom:hostname"]

# the bucket name of s3 bucket that is associated with the cognito user
bucket_name = jwt.get_unverified_claims(id_token)["custom:bucket"]

identity_pool_id = jwt.get_unverified_claims(id_token)["custom:identity_pool_id"]
cognito_identity_client = boto3.client(
    "cognito-identity", region_name=graphene_creds["region"], aws_access_key_id="", aws_secret_access_key=""
)
provider_name = f'cognito-idp.{graphene_creds["region"]}.amazonaws.com/{graphene_creds["user_pool_id"]}'
identity_id = cognito_identity_client.get_id(IdentityPoolId=identity_pool_id, Logins={provider_name: id_token})

aws_creds = cognito_identity_client.get_credentials_for_identity(
    IdentityId=identity_id["IdentityId"], Logins={provider_name: id_token}
)

s3_resource = boto3.resource(
    "s3",
    aws_access_key_id=aws_creds["Credentials"]["AccessKeyId"],
    aws_secret_access_key=aws_creds["Credentials"]["SecretKey"],
    aws_session_token=aws_creds["Credentials"]["SessionToken"],
)

print(f"Bucket keys: {[ obj.key for obj in s3_resource.Bucket(bucket_name).objects.all()]}")

S3 Uploads

First, a couple relevant S3 terms and definitions:

  1. A bucket is a container for storing files.

  2. A key is a string which uniquely identifies a file in an S3 bucket.

  3. A key prefix is an initial substring of a key which may be common among multiple keys, implicitly grouping them into a collection. (The key prefix may be thought of as a directory on a filesystem.)

Each Graphene user account gets a unique, isolated bucket. The bucket name can be obtained through the above example. Input parquet files must be uploaded with a key having at minimum a key prefix of uploads/ledgers/. Longer prefixes may be used to further group files. Check Ledger Format for details of Graphene input data format.

Note: Once files are uploaded under a given key, they must not be modified or deleted at that key. To upload modified data, users must upload to a new key (again with a minimum key prefix of uploads/ledgers/.).

Construct a business model

Construct a basic business model using the Graphene Data API.

Graphene authentication

The access token which is obtained during authenticating with S3 (see example) can be provided directly to the Graphene gRPC request without any further steps. It is provided with every request to the Graphene Data API or the Graphene Analysis API.

Create a network as a business model

This example will show you how to make a network with a loss set node as a business model using Graphene Data API. Check data model for details of business model. A loss set node allows Graphene to access input data. Graphene input data can either be a single parquet file or a collection of parquet files. So the Graphene input data path can either be a full s3 path that ends with .parquet (e.g. s3://<bucket_name>/uploads/ledgers/<...a_single_file_name>.parquet), or a a full s3 path that ends with a slash / delimiter (e.g. s3://<bucket_name>/uploads/ledgers/<...some_prefix_name>/)

This example continues using bucket_name, hostname, access_token from the above example. The hostname and access_token are used to create a gRPC channel which provides connection to the Graphene Data API or the Graphene Analysis API.

You will need to modify the input_data_path to be the path of your uploaded data from s3 upload:

import json
import grpc

from proto.public import data_management_api_pb2, data_management_api_pb2_grpc


#######################################################################
# Modify the s3 path below to match where your data have been uploaded:
#######################################################################

input_data_path = f"s3://{bucket_name}/<key>"

#######################################################################


def grpc_keepalive_options(keepalive_seconds):
    options = [
        ("grpc.keepalive_time_ms", keepalive_seconds * 1000),
        ("grpc.keepalive_permit_without_calls", 1),
        ("grpc.http2.max_pings_without_data", 0),
    ]

    # the default minimum is 5 minutes, so if this less, we also need to set the minimum
    grpc_default_minimum_s = 5 * 60
    if keepalive_seconds < grpc_default_minimum_s:
        options.append(("grpc.http2.min_time_between_pings_ms", keepalive_seconds * 1000))
    return options


def get_channel_config(access_token):
    channel_creds = grpc.ssl_channel_credentials()
    call_creds = grpc.access_token_call_credentials(access_token)
    creds = grpc.composite_channel_credentials(channel_creds, call_creds)
    channel_args = [creds, grpc_keepalive_options(60)]
    return channel_args


def create_channel(access_token, hostname):
    channel_opts = get_channel_config(access_token)
    return grpc.secure_channel("{}:{}".format(hostname, 443), *channel_opts)


# reuse the access_token and hostname to create a channel
channel = create_channel(access_token, hostname)

# connect to Data API
data_management_stub = data_management_api_pb2_grpc.DataManagementStub(channel)


try:
    # create a loss set node with specified input data path
    loss_set_properties = {
        "_schema": "LossSet_1.0",
        "path": input_data_path,
        "occurrence_key_column": "EventId",
        "label": f"LossSet[v1]",
    }
    # Enable gRPC-level errors
    node_ref = data_management_stub.CreateNode(
        data_management_api_pb2.CreateNodeRequest(
            properties=data_management_api_pb2.Properties(
                data=json.dumps(loss_set_properties).encode(), encoding=data_management_api_pb2.Properties.JSON
            )
        ),
    ).reference
    print(f"Loss set node created: {node_ref}")

    # create a network consisting of just the singular node
    network_ref = data_management_stub.CreateNetwork(
        data_management_api_pb2.CreateNetworkRequest(node_references=[node_ref],
        ),
    ).reference
    print(f"Network created: {network_ref}")

except grpc.RpcError as ex:
    print(f"Data API request failed: {ex.code()}: {ex.details()}")
finally:
    channel.close()

Request an analysis and retrieve the results

Request an analysis on the business model created in above example using the Graphene Analysis API. This example continues using hostname and access_token from the example to create a gRPC channel to connect to the Graphene Analysis API.

import grpc
from proto.public import (
    analysis_api_pb2, analysis_api_pb2_grpc, common_pb2
)


# reuse the access_token and hostname to create a channel
channel = create_channel(access_token, hostname)

# connect to Analysis API
analysis_api_stub = analysis_api_pb2_grpc.AnalysisStub(channel)

try:
    # execute an analysis request with distribution metrics.
    results = analysis_api_stub.RunAnalysis(
        analysis_api_pb2.NetworkAnalysisStreamRequest(
            trial_range=common_pb2.TrialRange(trial_begin=1, trial_count=5),
            network_analysis_requests=[
                analysis_api_pb2.NetworkAnalysisRequest(
                    reference=network_ref,
                    context=analysis_api_pb2.Context(template_search_paths=["financial_model"]),
                    distribution_metrics=[
                        analysis_pb2.DistributionMetricsDescriptor(
                            aggregation=analysis_pb2.AggregationDescriptor(aggregation_method="AEP"),
                            windows=[analysis_pb2.ProbabilityWindow(min_probability=0.0, max_probability=1.0)],
                        )
                    ],
                )
            ],
        ),
    )

    # get analysis request id from gRPC header.
    request_id = dict(results.initial_metadata()).get("x-graphene-request-id", "UNDEFINED")
    print(f"Analysis request id: {request_id}")

    # each result from the stream results can be one of header, error or result.
    for result in results:
        if result.WhichOneof("data") == "header":
            print(f"Received header: {result.header}")
        elif result.WhichOneof("data") == "result":
            print(f"Analysis API: received result: {result.result}")

except grpc.RpcError as ex:
    print(f"Analysis API: request failed: {ex.code()}: {ex.details()}")
finally:
    channel.close()

Request an analysis and retrieve the results from S3 asynchronously

Request an asynchronous analysis on the business model created in above example using the Graphene Analysis API. This example continues using hostname and access_token from the example to create a gRPC channel to connect to the Graphene Analysis API.

import grpc
import boto3
from urllib.parse import urlparse
from google.rpc import status_pb2
from google.protobuf import json_format

from proto.public import (
    analysis_api_pb2, analysis_api_pb2_grpc, common_pb2
)


# reuse the access_token and hostname to create a channel
channel = create_channel(access_token, hostname)

# connect to Analysis API
analysis_api_stub = analysis_api_pb2_grpc.AnalysisStub(channel)

try:
    # execute an analysis request with distribution metrics.
    reply, call = analysis_api_stub.StartAsyncAnalysis.with_call(
        analysis_api_pb2.AsyncAnalysisRequest(
            trial_range=common_pb2.TrialRange(trial_begin=1, trial_count=5),
            network_analysis_requests=[
                analysis_api_pb2.NetworkAnalysisRequest(
                    reference=network_ref,
                    context=analysis_api_pb2.Context(template_search_paths=["financial_model"]),
                    distribution_metrics=[
                        analysis_pb2.DistributionMetricsDescriptor(
                            aggregation=analysis_pb2.AggregationDescriptor(aggregation_method="AEP"),
                            windows=[analysis_pb2.ProbabilityWindow(min_probability=0.0, max_probability=1.0)],
                        )
                    ],
                )
            ],
        ),
    )

    # get analysis results s3 location.
    results_uri = reply.results_uri
    parsed = urlparse(results_uri)
    bucket_name = parsed.netloc
    key = parsed.path.lstrip('/')
    print(f"Analysis results bucket: {bucket_name}, key: {key}")

    # read request status from s3. repeat this until status is COMPLETE or ERROR.
    status_key = f"{key}analysis_status.json"
    s3 = boto3.resource("s3")
    s3_object = s3.Object(bucket_name, status_key).get()
    status_message = json_format.Parse(s3_object["Body"].read(), analysis_api_pb2.AsyncNetworkAnalysisReply())
    if status_message.status.code == common_pb2.Status.Code.ERROR:
        # read error details from s3.
        error_key = f"{key}error_details"
        s3_object = s3.Object(bucket_name, key).get()
        error = status_pb2.Status()
        error.ParseFromString(s3_object["Body"].read())
        print(f"Analysis API: async request processing failed: {error}")
    elif status_message.status.code == common_pb2.Status.Code.COMPLETE:
        bucket = s3.Bucket(bucket_name)
        results_key = f"{key}analysis_result_"
        results = list(bucket.objects.filer(Prefix=results_key))
        for s3_object in results:
            message = analysis_api_pb2.NetworkAnalysisStreamReply()
            message.ParseFromString(s3_object["Body"].read())
            if message.HasField("result"):
                print(f"Analysis API: received result: {message.result}")

except grpc.RpcError as ex:
    print(f"Analysis API: async request failed: {ex.code()}: {ex.details()}")
finally:
    channel.close()