.. index:: Quick Start Quick Start =========== Get started with Graphene: 1. `Authentication <#authentication>`__ 2. `S3 Uploads <#s3-uploads>`__ 3. `Construct a business model <#construct-a-business-model>`__ 4. `Request an analysis <#request-an-analysis-and-retrieve-the-results>`__ 5. `Request an async analysis <#request-an-analysis-and-retrieve-the-results-from-s3-asynchronously>`__ **Note**: Code examples below are given in **Python**. .. _authentication: Authentication -------------- Graphene User Account Credentials ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Graphene user account credentials** are provided by Analyze Re for accessing Graphene. They are in the form of an `Amazon Cognito `__ user and are unique per-user. The Graphene user account credentials consist of 5 parts: **username**, **password**, **user_pool_id**, **client_id**, **pool_region**. **user_pool_id:** id of a `cognito user pool `__, e.g. ``us-east-1_fpijivb8z`` **client_id:** id of a `client app `__ in the cognito user pool, e.g. ``1ameln1vm234htpq11238rmpol`` **pool_region:** `region `__ of the cognito user pool, e.g. ``us-east-1`` S3 Authentication ~~~~~~~~~~~~~~~~~ S3 authentication is a two-step process. An authorization token `ID token `__ is first obtained by using the **Graphene user account credentials**. That token is then used to obtain **temporary S3 access credentials**. These credentials expire in one hour, but can be refreshed indefinitely. The **temporary S3 access credentials** includes: ``aws_access_key_id``, ``aws_secret_access_key``, and ``aws_session_token``. **Note:** Graphene use two types of authorization token: **ID token** and **access token**. * **ID token** can be used to get hostname, bucket, and temporary S3 access credentials, see `example <#authenticating-using-cognito-to-access-an-s3-bucket>`__ below. * **access token** can be used to connect Graphene gRPC services, see example in `Graphene Authentication <#graphene-authentication>`__ Authenticating Using Cognito to Access an S3 Bucket ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Modify the Graphene user account credentials in this example to match your own. .. code:: python import boto3 from warrant_lite import WarrantLite from jose import jwt ########################################################################################### # Modify the following fields of the Graphene user account credentials to your credentials: ########################################################################################### graphene_creds = { "region": "some_region", "user_pool_id": "some_user_pool_id", "client_id": "some_client_id", "username": "some_username", "password": "some_password" } ########################################################################################### cognito = WarrantLite( username=graphene_creds["username"], password=graphene_creds["password"], pool_id=graphene_creds["user_pool_id"], client_id=graphene_creds["client_id"], client=boto3.client( "cognito-idp", region_name=graphene_creds["region"], aws_access_key_id="", aws_secret_access_key="" ), ) tokens = cognito.authenticate_user() # access token can be used to connect Graphene gRPC services. access_token = tokens["AuthenticationResult"]["AccessToken"] # id token can be used to get hostname, bucket, and temporary S3 access credentials id_token = tokens["AuthenticationResult"]["IdToken"] # the hostname is the Graphene gRPC serive hostname hostname = jwt.get_unverified_claims(id_token)["custom:hostname"] # the bucket name of s3 bucket that is associated with the cognito user bucket_name = jwt.get_unverified_claims(id_token)["custom:bucket"] identity_pool_id = jwt.get_unverified_claims(id_token)["custom:identity_pool_id"] cognito_identity_client = boto3.client( "cognito-identity", region_name=graphene_creds["region"], aws_access_key_id="", aws_secret_access_key="" ) provider_name = f'cognito-idp.{graphene_creds["region"]}.amazonaws.com/{graphene_creds["user_pool_id"]}' identity_id = cognito_identity_client.get_id(IdentityPoolId=identity_pool_id, Logins={provider_name: id_token}) aws_creds = cognito_identity_client.get_credentials_for_identity( IdentityId=identity_id["IdentityId"], Logins={provider_name: id_token} ) s3_resource = boto3.resource( "s3", aws_access_key_id=aws_creds["Credentials"]["AccessKeyId"], aws_secret_access_key=aws_creds["Credentials"]["SecretKey"], aws_session_token=aws_creds["Credentials"]["SessionToken"], ) print(f"Bucket keys: {[ obj.key for obj in s3_resource.Bucket(bucket_name).objects.all()]}") S3 Uploads ---------- First, a couple relevant S3 terms and definitions: 1. A `bucket `__ is a container for storing files. 2. A `key `__ is a string which uniquely identifies a file in an S3 bucket. 3. A `key prefix `__ is an initial substring of a key which may be common among multiple keys, implicitly grouping them into a collection. (The **key prefix** may be thought of as a **directory** on a filesystem.) Each Graphene user account gets a unique, isolated bucket. The bucket name can be obtained through the above `example <#authenticating-using-cognito-to-access-an-s3-bucket>`__. Input parquet files must be uploaded with a key having **at minimum** a key prefix of ``uploads/ledgers/``. Longer prefixes may be used to further group files. Check :doc:`Ledger Format ` for details of Graphene input data format. **Note:** Once files are uploaded under a given key, **they must not be modified or deleted at that key.** To upload modified data, users must upload to a new key (again with a minimum key prefix of ``uploads/ledgers/``.). Construct a business model -------------------------- Construct a basic business model using the :doc:`Graphene Data API `. Graphene authentication ~~~~~~~~~~~~~~~~~~~~~~~ The `access token `__ which is obtained during authenticating with S3 (see `example <#authenticating-using-cognito-to-access-an-s3-bucket>`__) can be provided directly to the Graphene gRPC request without any further steps. It is provided with every request to the :doc:`Graphene Data API ` or the :doc:`Graphene Analysis API `. Create a network as a business model ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This example will show you how to make a network with a loss set node as a business model using :doc:`Graphene Data API `. Check `data model `__ for details of business model. A loss set node allows Graphene to access input data. Graphene input data can either be a single parquet file or a collection of parquet files. So the Graphene input data path can either be a full s3 path that ends with ``.parquet`` (e.g. ``s3:///uploads/ledgers/<...a_single_file_name>.parquet``), or a a full s3 path that ends with a slash ``/`` delimiter (e.g. ``s3:///uploads/ledgers/<...some_prefix_name>/``) This example continues using ``bucket_name``, ``hostname``, ``access_token`` from the above `example <#authenticating-using-cognito-to-access-an-s3-bucket>`__. The ``hostname`` and ``access_token`` are used to create a `gRPC channel `__ which provides connection to the :doc:`Graphene Data API ` or the :doc:`Graphene Analysis API `. You will need to modify the ``input_data_path`` to be the path of your uploaded data from `s3 upload <#s3-uploads>`__: .. code:: python import json import grpc from proto.public import data_management_api_pb2, data_management_api_pb2_grpc ####################################################################### # Modify the s3 path below to match where your data have been uploaded: ####################################################################### input_data_path = f"s3://{bucket_name}/" ####################################################################### def grpc_keepalive_options(keepalive_seconds): options = [ ("grpc.keepalive_time_ms", keepalive_seconds * 1000), ("grpc.keepalive_permit_without_calls", 1), ("grpc.http2.max_pings_without_data", 0), ] # the default minimum is 5 minutes, so if this less, we also need to set the minimum grpc_default_minimum_s = 5 * 60 if keepalive_seconds < grpc_default_minimum_s: options.append(("grpc.http2.min_time_between_pings_ms", keepalive_seconds * 1000)) return options def get_channel_config(access_token): channel_creds = grpc.ssl_channel_credentials() call_creds = grpc.access_token_call_credentials(access_token) creds = grpc.composite_channel_credentials(channel_creds, call_creds) channel_args = [creds, grpc_keepalive_options(60)] return channel_args def create_channel(access_token, hostname): channel_opts = get_channel_config(access_token) return grpc.secure_channel("{}:{}".format(hostname, 443), *channel_opts) # reuse the access_token and hostname to create a channel channel = create_channel(access_token, hostname) # connect to Data API data_management_stub = data_management_api_pb2_grpc.DataManagementStub(channel) try: # create a loss set node with specified input data path loss_set_properties = { "_schema": "LossSet_1.0", "path": input_data_path, "occurrence_key_column": "EventId", "label": f"LossSet[v1]", } # Enable gRPC-level errors node_ref = data_management_stub.CreateNode( data_management_api_pb2.CreateNodeRequest( properties=data_management_api_pb2.Properties( data=json.dumps(loss_set_properties).encode(), encoding=data_management_api_pb2.Properties.JSON ) ), ).reference print(f"Loss set node created: {node_ref}") # create a network consisting of just the singular node network_ref = data_management_stub.CreateNetwork( data_management_api_pb2.CreateNetworkRequest(node_references=[node_ref], ), ).reference print(f"Network created: {network_ref}") except grpc.RpcError as ex: print(f"Data API request failed: {ex.code()}: {ex.details()}") finally: channel.close() .. _request-analysis-example: Request an analysis and retrieve the results -------------------------------------------- Request an analysis on the business model created in above `example <#create-a-network-as-a-business-model>`__ using the :doc:`Graphene Analysis API `. This example continues using ``hostname`` and ``access_token`` from the `example <#authenticating-using-cognito-to-access-an-s3-bucket>`__ to create a gRPC channel to connect to the :doc:`Graphene Analysis API `. .. code:: python import grpc from proto.public import ( analysis_api_pb2, analysis_api_pb2_grpc, common_pb2 ) # reuse the access_token and hostname to create a channel channel = create_channel(access_token, hostname) # connect to Analysis API analysis_api_stub = analysis_api_pb2_grpc.AnalysisStub(channel) try: # execute an analysis request with distribution metrics. results = analysis_api_stub.RunAnalysis( analysis_api_pb2.NetworkAnalysisStreamRequest( trial_range=common_pb2.TrialRange(trial_begin=1, trial_count=5), network_analysis_requests=[ analysis_api_pb2.NetworkAnalysisRequest( reference=network_ref, context=analysis_api_pb2.Context(template_search_paths=["financial_model"]), distribution_metrics=[ analysis_pb2.DistributionMetricsDescriptor( aggregation=analysis_pb2.AggregationDescriptor(aggregation_method="AEP"), windows=[analysis_pb2.ProbabilityWindow(min_probability=0.0, max_probability=1.0)], ) ], ) ], ), ) # get analysis request id from gRPC header. request_id = dict(results.initial_metadata()).get("x-graphene-request-id", "UNDEFINED") print(f"Analysis request id: {request_id}") # each result from the stream results can be one of header, error or result. for result in results: if result.WhichOneof("data") == "header": print(f"Received header: {result.header}") elif result.WhichOneof("data") == "result": print(f"Analysis API: received result: {result.result}") except grpc.RpcError as ex: print(f"Analysis API: request failed: {ex.code()}: {ex.details()}") finally: channel.close() Request an analysis and retrieve the results from S3 asynchronously ------------------------------------------------------------------- Request an asynchronous analysis on the business model created in above `example <#create-a-network-as-a-business-model>`__ using the :doc:`Graphene Analysis API `. This example continues using ``hostname`` and ``access_token`` from the `example <#authenticating-using-cognito-to-access-an-s3-bucket>`__ to create a gRPC channel to connect to the :doc:`Graphene Analysis API `. .. code:: python import grpc import boto3 from urllib.parse import urlparse from google.rpc import status_pb2 from google.protobuf import json_format from proto.public import ( analysis_api_pb2, analysis_api_pb2_grpc, common_pb2 ) # reuse the access_token and hostname to create a channel channel = create_channel(access_token, hostname) # connect to Analysis API analysis_api_stub = analysis_api_pb2_grpc.AnalysisStub(channel) try: # execute an analysis request with distribution metrics. reply, call = analysis_api_stub.StartAsyncAnalysis.with_call( analysis_api_pb2.AsyncAnalysisRequest( trial_range=common_pb2.TrialRange(trial_begin=1, trial_count=5), network_analysis_requests=[ analysis_api_pb2.NetworkAnalysisRequest( reference=network_ref, context=analysis_api_pb2.Context(template_search_paths=["financial_model"]), distribution_metrics=[ analysis_pb2.DistributionMetricsDescriptor( aggregation=analysis_pb2.AggregationDescriptor(aggregation_method="AEP"), windows=[analysis_pb2.ProbabilityWindow(min_probability=0.0, max_probability=1.0)], ) ], ) ], ), ) # get analysis results s3 location. results_uri = reply.results_uri parsed = urlparse(results_uri) bucket_name = parsed.netloc key = parsed.path.lstrip('/') print(f"Analysis results bucket: {bucket_name}, key: {key}") # read request status from s3. repeat this until status is COMPLETE or ERROR. status_key = f"{key}analysis_status.json" s3 = boto3.resource("s3") s3_object = s3.Object(bucket_name, status_key).get() status_message = json_format.Parse(s3_object["Body"].read(), analysis_api_pb2.AsyncNetworkAnalysisReply()) if status_message.status.code == common_pb2.Status.Code.ERROR: # read error details from s3. error_key = f"{key}error_details" s3_object = s3.Object(bucket_name, key).get() error = status_pb2.Status() error.ParseFromString(s3_object["Body"].read()) print(f"Analysis API: async request processing failed: {error}") elif status_message.status.code == common_pb2.Status.Code.COMPLETE: bucket = s3.Bucket(bucket_name) results_key = f"{key}analysis_result_" results = list(bucket.objects.filer(Prefix=results_key)) for s3_object in results: message = analysis_api_pb2.NetworkAnalysisStreamReply() message.ParseFromString(s3_object["Body"].read()) if message.HasField("result"): print(f"Analysis API: received result: {message.result}") except grpc.RpcError as ex: print(f"Analysis API: async request failed: {ex.code()}: {ex.details()}") finally: channel.close()