How to output to Blob service with Durable functions

198 Views Asked by At

I am new to Azure functons and Durable functions. I want to write a code that the activity function called by the orchestrator function will compute some value and connect to the Blob service.

How can I modify the code below to output the value generated by the activity function to the Blob service?

The environment I am using is,

  • Azure Functions, Durable Functions
  • Visual Studio Code
  • Python programming model v2

The following is a simplified code I am considering using. Please modify this one. Incidentally, client functions and orchestration functions may not have much to do with each other

import  azure.functions  as  func
import  azure.durable_functions  as  df
import  logging  

app  =  df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)  

### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async  def  client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
    instance_id  =  await  client.start_new("orchestrator", None, {})
    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    await  client.wait_for_completion_or_create_check_status_response(req, instance_id)
    status  =  await  client.get_status(instance_id)
    return  f"output: {status.output}"  

### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def  orchestrator(context: df.DurableOrchestrationContext) -> dict:
    test  =  yield  context.call_activity("main", "")
    return {"Test": test} 

### activity function ###
@app.blob_output(arg_name="outputblob",
                path="newblob/test.txt",
                connection="BlobStorageConnection")

@app.activity_trigger(input_name="blank")
def  main(blank: str, outputblob: func.Out[str]):
    string  =  "Data is successfully Inserted"
    logging.info(f'Python Queue trigger function processed {len(string)} bytes')
    outputblob.set(string)
    return  "Completed"

local_setting.json

{
  "IsEncrypted": false,
  "Values": {
  "AzureWebJobsStorage": "UseDevelopmentStorage=true",
  "FUNCTIONS_WORKER_RUNTIME": "python",
  "AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
  "BlobStorageConnection": "DefaultEndpointsProtocol=https;AccountNa***"
  }
}

error message enter image description here

1

There are 1 best solutions below

21
On BEST ANSWER

I have used the below code to write the value generated by activity function into blob storage.

function_app.py

import  azure.functions  as  func
import  azure.durable_functions  as  df
import  logging  

app  =  df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)  

### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async  def  client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
instance_id  =  await  client.start_new("orchestrator", None, {})
logging.info(f"Started orchestration with ID = '{instance_id}'.")
await  client.wait_for_completion_or_create_check_status_response(req, instance_id)
status  =  await  client.get_status(instance_id)
return  f"output: {status.output}"  

### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def  orchestrator(context: df.DurableOrchestrationContext) -> dict:
test  =  yield  context.call_activity("main", "")
return {"Test": test} 

### activity function ###
# I want to calculate the value and output it to the bound service.
@app.blob_output(arg_name="outputblob",
path="newblob/test.txt",
connection="BlobStorageConnection")

@app.activity_trigger(input_name="blank")
def  main(blank: str, outputblob: func.Out[str]):
string  =  "Data is successfully Inserted"
logging.info(f'Python Queue trigger function processed {len(string)} bytes')
outputblob.set(string)
return  "Completed"

local.settings.json

{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
"BlobStorageConnection": "DefaultEndpointsProtocol=**********"
}
}

Fetch the value of connection string as shown below and add it in local.settings -

enter image description here

Output:

enter image description here

enter image description here

enter image description here