Luis Jaramillo
11/28/2021, 3:31 PMSridhar
11/29/2021, 12:38 AMget_data_asynchronous()
function below creates 10 threads and calls the api concurrently. I am using this function in run_factset_api()
. As a standalone code locally this works fine. But when I schedule a run on prefect, the run_factset_api()
function exits before execution and returns coroutine object (although locally it returns the desired value). Is there something I should do to facilitate parallel run on prefect?
async def get_data_asynchronous():
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(
executor,
company.get_company_records,
*(session, [companies], {**company_info, **formulas})
# Allows us to pass in multiple arguments to `fetch`
)
for companies in companies_to_fetch
]
for response in await asyncio.gather(*tasks):
master = master.append(response, ignore_index=True)
return master
@task
def run_factset_api():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
master = loop.run_until_complete(future)
return master
@task
def save_data_to_s3(emmi_reduction):
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'factset_output_data.csv').put(Body=csv_buffer.getvalue())
with Flow('api-flow', storage=STORAGE, run_config=RUN_CONFIG) as flow:
response = run_factset_api()
if response:
save_data_to_db(response)
flow.register('pipeline')
Priyab Dash
11/29/2021, 9:08 AM@task(log_stdout=True, state_handlers=[notify_run_failure])
def submit_job_run_to_tmc(job_run):
but this is being called twice when we run a flowGabriel Milan
11/29/2021, 11:45 AMagent
section of the values.yaml
file looks like this:
agent:
enabled: true
prefectLabels:
- mylabel
...
job:
...
envFrom:
- secretRef:
name: gcp-credentials
The secret gcp-credentials
exists and is correct. Unfortunately, this doesn't seem to workZohaa Qamar
11/29/2021, 2:07 PMBrian Phillips
11/29/2021, 2:59 PMTim Enders
11/29/2021, 3:39 PMGuillaume Latour
11/29/2021, 5:11 PMprefect agent local start
so it uses my local timezone)
Is there any configuration option that I am missing that could provide a custom timezone to all the dockers launched via the prefect server start
?dammy arinde
11/29/2021, 5:33 PMclient.create_flow_run()
to run the dependent flows but when I add parameter to it, it's not passing to the dependent flow.Isaac Brodsky
11/29/2021, 7:23 PMAttributeError: 'Context' object has no attribute 'image'
when DaskExecutor tries to start up a clusterDerek Heyman
11/29/2021, 8:18 PMPhilip MacMenamin
11/29/2021, 9:36 PMjoshua mclellan
11/29/2021, 10:30 PMSandip Viradiya
11/30/2021, 12:10 AMDekel R
11/30/2021, 8:11 AMFile "/usr/local/lib/python3.9/site-packages/gcsfs/credentials.py", line 84, in _connect_google_default
raise ValueError(msg.format(self.project, project))
ValueError: User-provided project 'my_project' does not match the google default project 'some_generated_id'. Either
Some code snippets and information about my flow -
from prefect import Flow
from prefect.storage import Docker
from tasks.extract_product_data import extract_data
from prefect.run_configs import VertexRun
from prefect.schedules import IntervalSchedule
from datetime import timedelta
schedule = IntervalSchedule(interval=timedelta(days=1))
with Flow("extract_comparable_products_data",
storage=Docker(registry_url="us-central1-docker.pkg.dev/xxxx/",
dockerfile="./Dockerfile"), schedule=schedule) as flow:
extract_data()
flow.run_config = VertexRun(machine_type='e2-standard-16', labels=["ml"],
service_account='<http://prefect_service_account.iam.gserviceaccount.com|prefect_service_account.iam.gserviceaccount.com>')
The flow has only one task for now for testing purposes.
My task is using data from multiple projects of my organization (google cloud projects) so In every google_client interaction I use a specific project as a parameter, for example -
storage_client = storage.Client(project='my_pro_1')
The service account that Prefect use has permissions to all of the relevant projects (In general, storage, Bigquery, Artifactory, Vertex AI) .
Anyone familiar with this issue?
Thanks.Emma Rizzi
11/30/2021, 10:35 AMGuilherme Petris
11/30/2021, 10:38 AMTom Klein
11/30/2021, 1:04 PMPrefect-core
and prefect-orion
?
And maybe also some clarity about which of them is run when we go in the cloud path?Zheng Xie
11/30/2021, 1:17 PMSamuel Hinton
11/30/2021, 2:45 PMMartim Lobao
11/30/2021, 5:53 PMbral
11/30/2021, 6:41 PMIsaac Brodsky
11/30/2021, 8:28 PMHTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \"dask-root-elided\" is forbidden: User \"system:serviceaccount:default:default\" cannot get resource \"pods/log\" in API group \"\" in the namespace \"default\"","reason":"Forbidden","details":{"name":"dask-root-elided","kind":"pods"},"code":403}
Was trying to install a new Prefect agent and it seems the RBAC setup is not right? The agent was configured with prefect agent kubernetes install --key $PREFECT_API_KEY --rbac --label my_label_here
Isobel Jones
12/01/2021, 11:01 AMFailed to load and execute Flow's environment: AssertionError(None)
. Trying to access private github repo for first time. Have set the github token. Running on gke
flow.storage = GitHub(
repo="Infrastructure/prefect", # name of repo
path="_cfg/server/hello.py", # location of flow file in repo
base_url="<https://github.private-repo.com/>"
)
we are setting the env variable which contains the tokenAqib Fayyaz
12/01/2021, 12:47 PMAndré Petersen
12/01/2021, 1:55 PMWieger Opmeer
12/01/2021, 2:39 PMLeon Kozlowski
12/01/2021, 2:49 PMDaniel Suissa
12/01/2021, 3:54 PMDaniel Suissa
12/01/2021, 3:57 PMDaniel Suissa
12/01/2021, 3:57 PMKevin Kho
12/01/2021, 4:10 PMflow.run()
is not meant for production so you need to register the subflows and then trigger them with the StartFlowRun
or create_flow_run
task like this@task
def abc():
return 1
@task
def bcd():
abc.run()
but the abc
in bcd
is no longer a task. it’s like a regular Python function so you don’t get observabilityDaniel Suissa
12/01/2021, 4:23 PMKevin Kho
12/01/2021, 4:26 PMDaniel Suissa
12/01/2021, 4:29 PMKevin Kho
12/01/2021, 4:34 PM@task
def plus_one(x):
return x+1
with Flow("name") as flow:
a = plus_one(1)
b = plus_one(a)
This will implicitly build the dependencies for you and pass the value of a
in memorywith Flow(...) as flow:
a = first_task()
b = second_task()
c = third_task(c_inputs, upstream_tasks=[a,b])
or
with Flow("ex") as flow:
a = first_task()
b = second_task(upstream_tasks=[a])
c = third_task(upstream_tasks=[b])
if there is no data dependencyDaniel Suissa
12/01/2021, 4:44 PMKevin Kho
12/01/2021, 4:45 PM