Thomas Pedersen
04/19/2022, 7:36 AMDaniel Nilsen
04/19/2022, 10:24 AMAnthony Harris
04/19/2022, 5:46 PMA
and agent B
for simplicity), and we specify that a flow can run on either A
or B
, how is an agent selected to execute the flow run?Lana Dann
04/19/2022, 6:49 PMLana Dann
04/19/2022, 10:43 PMdbt.log
file and manifest.json
file that dbt generates after we run each flow. but i only see link artifacts and markdown artifacts in the source codeSharath Chandra
04/20/2022, 5:46 AMLocalExecutor
which these mapped jobs are running sequentially.
Are there instances where the LocalExecutor is not able to track the spark job running on k8s and thus not able to trigger the subsequent tasks in the map ?Saurabh Sharma
04/20/2022, 7:00 AMSTORAGE = S3(
bucket="prefect-pipelines",
key=f"flows/forecaster/flow.py",
stored_as_script=True,
# this will ensure to upload the Flow script to S3 during registration
local_script_path=f"flow.py",
)
The following command is used for registering the flow:
prefect register --project forecaster -p forecaster/
But for some reason, all the modules under the forecaster
directory is not getting uploaded under the S3 key specified.
Need help in solving this. Thanks in advance!Marc Lipoff
04/20/2022, 4:19 PMLana Dann
04/20/2022, 7:37 PMkevin
04/20/2022, 9:15 PMwith Flow('foo') as f:
chck = Parameter('checkpointing') # Let's say we pass in False
do_stuff = task(some_python_callable, checkpoint=chck)()
Saurabh Indoria
04/21/2022, 4:23 AMrequests.exceptions.RetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by ResponseError('too many 502 error responses'))
And
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by ReadTimeoutError("HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Read timed out. (read timeout=15)",))
Are there any API limits, throttling, etc?Saurabh Indoria
04/21/2022, 4:44 AMFailed to retrieve task state with error: ClientError([{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}])
Traceback (most recent call last):
File "/var/www/.venv/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
task_run_info = self.client.get_task_run_info(
File "/var/www/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 1479, in get_task_run_info
result = self.graphql(mutation) # type: Any
File "/var/www/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 473, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}]
Can someone please help with this?
Flow run: https://cloud.prefect.io/quiltai/flow-run/f17f2795-af8f-415e-9db7-c6c0643111bdChu Lục Ninh
04/22/2022, 8:40 AMkubernetes.RunNamespacedJob
, when run a long-running job (in my case half of the day). I got huge memory leak. My suspect is this job use ThreadPoolExecutor
to poll job log every 5s using ReadNamespacedPodLogs
, since ReadNamespacedPodLogs executed in other thread context, its refCount didn't reduce to 0 to trigger GC when we assign new value to read_pod_logs
, and that leads to orphan object and leak the memory
https://github.com/PrefectHQ/prefect/blob/14644fa5fe129b7fa8385d58df82e6f27332b04a/src/prefect/tasks/kubernetes/job.py#L773Daniel Nilsen
04/22/2022, 9:03 AMTomás Emilio Silva Ebensperger
04/22/2022, 5:28 PMPinakpani Mukherjee
04/23/2022, 10:32 AM[server.database]
connection_url = "<postgres://prefect:prefect@localhost:5432/postgres>"
The problem that I am currently having now is that after putting in prefect server start -ep
, Hasura is not able to connect properly and throws errors. Is there anyway I can rectify and solve this issue?Sharath Chandra
04/23/2022, 3:01 PMSharath Chandra
04/25/2022, 4:08 AMspark-submit
jobs. The jobs are submitted on fire & forget mode (spark.kubernetes.submission.waitAppCompletion=false
). I have a separate monitoring task to monitor status of each job.
However as number of jobs increases, the cluster gets overwhelmed.
Is there a way to throttle the task submits on the mapped items?Alexis Lucido
04/25/2022, 2:30 PMDarren Burton
04/26/2022, 2:24 AMSharath Chandra
04/26/2022, 10:02 AMRon Meshulam
04/26/2022, 12:16 PMrequests.exceptions.ReadTimeout: HTTPConnectionPool(host='prefect-apollo.prefect', port=4200): Read timed out. (read timeout=15)
I thought this was a scale issue so I've replicated the services as follows:
• agent: 3
• UI: 2
• apollo: 3
• graphql: 2
• hasura: 2
• towel: 2
we are using an external Postgress (GCP managed)
I've seen in earlier massages that I should configure:
1. PREFECT__CLOUD__REQUEST_TIMEOUT = 60 (configured it on the env in the apollo pod)
2. PREFECT_SERVER__TELEMETRY__ENABLED = false (configured it on the env in the agent pod)
3. PREFECT__CLOUD__HEARTBEAT_MODE = thread (configured it on the env in the agent pod)
I've attached the values.yaml
Can anyone help me what else can I do/ what could be the problem?Lauri Makinen
04/26/2022, 1:23 PMprefect get flows
Traceback (most recent call last):
...
prefect.exceptions.ClientError: [{'message': 'unexpected null value for type "String"', 'locations': [{'line': 2, 'column': 5}], 'path': ['flow'], 'extensions': {'path': '$.selectionSet.flow.args.where._and[0].project.name._eq', 'code': 'validation-failed', 'exception': {'message': 'unexpected null value for type "String"'}}}]
Bo
04/26/2022, 4:17 PMConnectTimeout(MaxRetryError("HTTPSConnectionPool(host='<http://api.github.com|api.github.com>', port=443): Max retries exceeded with url: *redacted* (Caused by ConnectTimeoutError(, 'Connection to <http://api.github.com|api.github.com> timed out. (connect timeout=15)'))
madhav
04/26/2022, 8:45 PMaws-actions/configure-aws-credentials
action with Error: Credentials could not be loaded, please check your action inputs: Could not load credentials from any providers
. Has anyone seen this error before when trying to package their flows into a container?Daniel Nilsen
04/27/2022, 10:58 AMmy-flow.flow
come from? I do have a flow named my-flow
but no file with that name.
INFO - prefect.Docker | Building the flow's Docker storage...
Step 1/18 : FROM prefecthq/prefect:1.0.0-python3.7
---> 2005944ce1c0
Step 2/18 : ENV PYTHONPATH='$PYTHONPATH:modules/' PREFECT__USER_CONFIG_PATH='/opt/prefect/config.toml'
---> Using cache
---> 749cf2ee64e6
Step 3/18 : RUN pip install pip --upgrade
---> Using cache
---> 9f046e7adbe1
Step 4/18 : RUN pip show prefect || pip install git+<https://github.com/PrefectHQ/prefect.git@1.0.0#egg=prefect[all_orchestration_extras]>
---> Using cache
---> 27acea17a687
Step 5/18 : RUN pip install graphql-core
---> Using cache
---> 2c30c81c1d46
Step 6/18 : RUN mkdir -p /opt/prefect/
---> Using cache
---> 5976f2fd06b9
Step 7/18 : COPY my-flow.flow /opt/prefect/flows/my-flow.prefect
Greg Wyne
04/28/2022, 10:17 PMquery {
tenant {
id
}
}
With error:
GRAPHQL_VALIDATION_FAILED: Cannot query field "tenant" on type "Query".
However I can run the mutation to create a tenant. Did I miss a step or a service somewhere? Thanks!YD
04/29/2022, 5:33 PMEgil Bugge
05/03/2022, 7:18 PMLana Dann
05/04/2022, 7:23 PMflow.<method name>(<method arguments>)
doesn’t work, so i assume we’d need to grab the task from flow.tasks
and then run the task object.
so my follow up question would be, what is the best way to retrieve a task by name from a Flow
object?Lana Dann
05/04/2022, 7:23 PMflow.<method name>(<method arguments>)
doesn’t work, so i assume we’d need to grab the task from flow.tasks
and then run the task object.
so my follow up question would be, what is the best way to retrieve a task by name from a Flow
object?Kevin Kho
05/04/2022, 7:50 PMfrom prefect import Flow, task
@task
def abc(x):
return x
@task
def bcd(x):
return x
with Flow("..") as flow:
a = abc(1)
b = bcd(1)
print(flow.tasks)
print(list(flow.tasks)[0].name)
And then find the one with the name you want? But I think you can just test the task directly?Lana Dann
05/04/2022, 8:02 PM@task
def abc(x):
return x
and you import from myflow import abc
then is that a task object or just a method?Kevin Kho
05/04/2022, 8:23 PMabc.run()