yu zeng
07/07/2022, 5:16 AMShivam Bhatia
07/07/2022, 6:34 AMAlvaro Durán Tovar
07/07/2022, 7:33 AMRunNamespacedJob
task. Because of a known bug the way I'm obtaining the logs are via ReadNamespacedPodLogs
. All that working well. Recently I changed the pod spec to run the application as an initContainer
then some docker image to capture results as the actual container
. So I have an image that generates content and logs in the ìnitContainer
and a container
that process the produced content (upload to gcs basically).
Problem: the logs from the initContainer are not being captured by prefect, only the logs from the container. Any recommendation? 🙏Rainer Schülke
07/07/2022, 8:04 AMweeks = Parameter("weeks", required=False, default=None)
Afterwards there is a case block where either the last actual weeks is getting pulled or set to the provided Parameter. Locally it's working like a charm but when I want to execute the flow within the cloud, I get an error for the week task:
The following error messages were provided by the GraphQL server: INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at "input.states[0].task_run_id"; Expected non-nullable type UUID! not to be null. The GraphQL query was: mutation($input: set_task_run_states_input!) { set_task_run_states(input: $input) { states { status message id } } } The passed variables were: {"input": {"states": [{"state": {"context": {"tags": []}, "cached_inputs": {}, "message": "Starting task run.", "_result": {"__version__": "0.14.10", "type": "NoResultType"}, "__version__": "0.14.10", "type": "Running"}, "task_run_id": null, "version": null}]}}
Task 'weeks': Finished task run for task with final state: 'ClientFailed'
Do you know what might be the problem here? Why is it stated that I got a null value? It should be None.Dennis Hinnenkamp
07/07/2022, 9:06 AMChristian Vogel
07/07/2022, 10:59 AM(begin_task_run pid=141324) ImportError: cannot import name 'SubprocessFlowRunner' from partially initialized module 'prefect.flow_runners' (most likely due to a circular import) (/home/christian/Documents/ray_and_prefect/env/lib/python3.9/site-packages/prefect/flow_runners/__init__.py)
I am using the following dependencies: prefect==2.0b7 prefect-ray==0.1.0 ray==1.13.0
Apparently I am doing something wrong with my dependencies or when I am importing them. Do you have any idea?Marcin Grzybowski
07/07/2022, 12:10 PMRobert Kowalski
07/07/2022, 1:06 PMmap
function fail? and works when map
was replace with bind
?
from prefect import task, Flow, unmapped
@task
def first():
return range(10)
@task
def second():
return True
@task
def third(numbers):
print(numbers)
with Flow(name='test') as flow:
numbers = first()
second_result = second()
# third.bind(numbers=numbers, upstream_tasks=[second_result])
third.map(numbers=unmapped(numbers), upstream_tasks=[second_result])
flow.run()
Kyle McChesney
07/07/2022, 2:38 PMKeith Veleba
07/07/2022, 4:29 PMUnexpected error: AttributeError("'S3Result' object has no attribute 'upload_options'")
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 89, in write
ExtraArgs=self.upload_options,
AttributeError: 'S3Result' object has no attribute 'upload_options'
Attached is one of the flows that are failing. Does the task running code record execution state back to the storage bucket?
Thanks in advance!Britt Evans
07/07/2022, 4:35 PMRajvir Jhawar
07/07/2022, 6:55 PMkubectl
configured to connect to a clusterXavier Witdouck
07/07/2022, 7:04 PMMatthew Seligson
07/07/2022, 7:18 PMMichal Baumgartner
07/07/2022, 7:29 PMprefect.storage.Module
) would be available in the future releases. We're running Prefect + Dask setup on k8s from a monorepo (i.e. agents and dask workers deployments run the same docker image & tag), where flows are sourced from Python modules (and registered before agents start up during a new release), therefore I'd prefer to not use any object storage for storing flows or results if possibleAdam
07/07/2022, 7:46 PMAlex Tam
07/07/2022, 7:54 PMredsquare
07/07/2022, 7:58 PMIfeanyi Okwuchi
07/07/2022, 8:00 PMUnexpected error: KeyError(0)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 682, in prepare_upstream_states_for_mapping
value = upstream_state.result[i]
KeyError: 0
Paul Stark
07/07/2022, 10:08 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
sql_queries = cfg.sql_query.split(';')
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query(
account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
warehouse=cfg.snowflake_warehouse,
).map(query=sql_queries)
sql_queries is getting put into a list since I can loop through it….Any thoughts on what I am doing incorrectly?Jake
07/07/2022, 10:14 PM.run()
on the flow that gets returned works fine, and registering the flow seems to be fine too (using k8s run). However, when we try to actually run it, we get the following error:
Failed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'build_index\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
Am I missing something obvious?Cole Murray
07/08/2022, 3:27 AMfrom typing import List
from uuid import uuid4
from prefect.client import get_client
from prefect.flow_runners import SubprocessFlowRunner
from prefect.flows import Flow
from prefect.orion.schemas.data import DataDocument
from prefect.orion.schemas.schedules import CronSchedule
from workflow_etl.flows.flow import hello_world
def main():
prefect_client = get_client()
schedules = [] # TODO
for schedule in schedules:
flow_id = await prefect_client.create_flow(hello_world)
prefect_client.create_deployment(flow_id=flow_id,
name=schedule.id,
schedule=CronSchedule(cron="0 * * * * *"),
parameters={
**schedule.workflow_params
},
tags={
'owner_id': schedule.owner_id,
},
flow_runner=SubprocessFlowRunner(),
flow_data=# UNKNOWN WHAT TO PUT HERE
)
I’m a bit stuck on what to put in the flow_data argument. Anyone tried this / have a link to sample?Arnas
07/08/2022, 8:06 AMfrom prefect import task, Flow, Parameter
@task
def task_function(in_list_size=10, out_list_size=4):
in_list = list(range(in_list_size))
print(f">>> Input list: {in_list}")
out_list = list(range(out_list_size))
print(f">>> Output list: {out_list}")
diff_list = [i for i in in_list if i not in out_list]
print(f">>> Result list: {diff_list}")
return diff_list
with Flow("Test Flow") as flow:
in_list_size = Parameter("input_list_size", default=10)
out_list_size = Parameter("output_list_size", default=4)
result = task_function(in_list_size=in_list_size, out_list_size=out_list_size)
flow.register(project_name="tutorial")
if __name__ == "__main__":
flow.run()
Trying to figure out where the problem is - guessing something is wrong on the Local Agent side?Pierre-Edouard
07/08/2022, 9:15 AMTom Klein
07/08/2022, 9:34 AMLocalDaskExecutor
for our flow, but to be able to limit the parallelisation (due to each task requiring a lot of resources when being run) to - for example - only two tasks at a time, is that possible? I read the docs but still don’t fully understand if we have to use a DaskExecutor
for this, and if so - would it by default run locally (if we don’t give it any other config)?
and kind of tangenial (and not directly related to prefect) but is there some advantage of something like the AWS fargate cluster for dask of a k8s dask cluster? is the former just easier to set up or something?Tarek
07/08/2022, 9:50 AMAndreas
07/08/2022, 9:51 AM09:47:51.189 | INFO | prefect.engine - Flow run 'shiny-falcon' received invalid parameters and is marked as failed
However this information is quite limited. Is there a way to get more information which of the parameters failed to pass the pydantic check?Keith
07/08/2022, 2:00 PMOOMError
but got around that by increasing the memory requested from k8s. Now I am stuck on having the ZombieKiller stop tasks that should be reporting back status since they continue to produce logs with the message No heartbeat detected from the remote task; marking the run as failed.
I have attempted to set the HEARTBEAT_MODE
to thread
and off
via the config.toml
file as well as in the KubernetesRun
environment variables but no matter the combination I setup I still run into heartbeat errors killing a process. I am curious if there is a way to send a heartbeat from within the code base or if there is another approach I should take to get around my time out issue. Cheers!Bogdan Serban
07/08/2022, 2:04 PMMarcin Grzybowski
07/08/2022, 2:42 PM