egk
04/14/2022, 5:39 PMAric Huang
04/14/2022, 6:19 PM@task(result=GCSResult(bucket=<bucket>))
method of configuring a task result, is the bucket path fixed at flow registration time? If so, is there a way it can be dynamically set at flow run time? What I'm hoping to do is have flows that can be registered to run on different clusters (using agent labels), and have their GCSResult bucket path be configured via an env var on the cluster. That way we can re-use the same flow code across different clusters but have different results buckets depending on the cluster.Jason
04/14/2022, 6:48 PMPhilip MacMenamin
04/14/2022, 6:50 PM2022-04-14 09:42:06-0600] ERROR - prefect.TaskRunner | Task 'ShellTask[0]': Exception encountered during task execution!
Traceback (most recent call last):
File "/blah/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/blah/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/blah/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/blah/python3.9/site-packages/prefect/tasks/shell.py", line 131, in run
tmp.write(command.encode())
AttributeError: 'list' object has no attribute 'encode'
I have a couple of questions:
• Is there a way to tag shell_tasks such that you can see some clue as to which one failed?
• Can I get a better description of the failure
At the moment I have
shell_task = ShellTask(log_stderr=True, return_all=True, stream_output=True)
egk
04/14/2022, 6:57 PMegk
04/14/2022, 7:02 PMJason
04/14/2022, 7:16 PM[14 April 2022 2:14pm]: An error occurred (InvalidParameterException) when calling the RunTask operation: No Fargate configuration exists for given values.
. The weird thing is that 4096 appears to be a valid entry: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-cpu-memory-error.html. Is it possible I screwed up the definition of my fargate
cluster?Jason
04/14/2022, 8:13 PMdocker_storage = Docker(
registry_url=environ["REGISTRY_URL"],
dockerfile="./Dockerfile",
image_name="{edited}-prod-platform-prefect-{project}",
image_tag="latest",
)
It seems like each flow could demand its own image in order to separate dependencies, which would mean creating an ECR repo for each workflow? I suppose this wouldn't be that difficult to script with Github Actions and aws-cli globbing a directory for workflow names?Vipul
04/14/2022, 8:29 PMApoorva Desai
04/14/2022, 9:07 PMKen Nguyen
04/14/2022, 11:28 PMAlexander Butler
04/14/2022, 11:55 PMprefect deployment create
says it should create or update a deployment but it is failing when the deployment exists (prefect 2.0) ?Ryan R
04/15/2022, 12:11 AMApoorva Desai
04/15/2022, 3:05 AMSubhajit Roy
04/15/2022, 4:42 AMfrom prefect.engine.signals import SKIP
........
........
raise SKIP('Skipping all downstream dependencies.')
With this all the following tasks are being skipped and was expecting the flow state will be skipped
. Though the following tasks are being skipped but at the end the flow becomes successful
.
I have two questions around :
1. Is this something expected?
2. If this is expected, whats the remedy . Do I need to explicitly use a state handler on top of it to make the flow skipped
eventuallyAndrey Vinogradov
04/15/2022, 9:14 AMBrett Naul
04/15/2022, 1:06 PMPatrick Tan
04/15/2022, 2:09 PMPedro Machado
04/15/2022, 2:52 PMDomenico Di Gangi
04/15/2022, 3:09 PMAhmed Ezzat
04/15/2022, 5:29 PMprefect 1.2.0-python3.9
docker image. same as https://github.com/PrefectHQ/prefect/issues/3952
for the dev team: https://cloud.prefect.io/bitthebyte/flow-run/b30223e1-5308-48fe-aa0b-9326c6e48860 (this is the stuck workflow) I already tried restartingMelqui de Carvalho
04/15/2022, 6:46 PMMohan kancherla
04/15/2022, 7:41 PMsidravic
04/16/2022, 8:59 AMtask_definition_arn
with the containers named as flow
While I'm able to trigger the flows, the flow crashes with the error
copilot/flow/8d31faa7f1ba File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
copilot/flow/8d31faa7f1ba return _bootstrap._gcd_import(name[level:], package, level)
copilot/flow/8d31faa7f1ba File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
copilot/flow/8d31faa7f1ba File "<frozen importlib._bootstrap>", line 991, in _find_and_load
copilot/flow/8d31faa7f1ba File "<frozen importlib._bootstrap>", line 961, in _find_and_load_unlocked
copilot/flow/8d31faa7f1ba File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
copilot/flow/8d31faa7f1ba File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
copilot/flow/8d31faa7f1ba File "<frozen importlib._bootstrap>", line 991, in _find_and_load
copilot/flow/8d31faa7f1ba File "<frozen importlib._bootstrap>", line 973, in _find_and_load_unlocked
copilot/flow/8d31faa7f1ba ModuleNotFoundError: No module named '/root/'
However, i've ensured the flows folder from my project are under the PYTHONPATH and I can't entirely figure out what (if anything) that cloudpickle is trying to do to access those flows at the time of execution.Hash Lin
04/16/2022, 2:28 PMFailed to load and execute flow run: ModuleNotFoundError("No module named '/Users/xxx/'")
Thanks for helping. 🙇Blake
04/17/2022, 1:16 AMMasatoShima
04/17/2022, 7:20 AMs3_storage_block = S3StorageBlock(
bucket="********",
profile_name="default",
region_name="ap-northeast-1",
)
async with get_client() as client:
block_id = await client.create_block(
name="********",
block=s3_storage_block,
block_spec_id=uuid.UUID("{12345678-1234-5678-1234-567891234567}")
)
Ken Nguyen
04/17/2022, 7:21 PMAlexander Butler
04/17/2022, 8:52 PMDeploymentSpec(
flow_location=str((FLOW_DIR / "salesforce.py").absolute()),
flow_name="elt-salesforce",
name="sf-production-elt-job",
schedule=IntervalSchedule(interval=timedelta(hours=1)),
tags=["pipeline"],
flow_runner=DockerFlowRunner(image=f"{IMAGE_REPO}/{DBT_IMAGE}:{TAG}", stream_output=True)
)
And it took awhile to come to me as a requirement but I essentially have 2 steps.
Step one requires docker image A to do some data pipeline stuff, step 2 needs my custom dbt docker image B to do some transform AFTER step 1. So these two dependent tasks constitute one flow with each step on independent docker images.
A flow runner is configured at a deployment level but I dont see a way to configure it at the task or subflow level. Definitely a key req in current state.
Please help!Ken Nguyen
04/17/2022, 10:17 PMtest_param = Parameter('test_param', default="default_val")
function(test_param)
Where I got an AttributeError:
AttributeError: 'Parameter' object has no attribute
Ken Nguyen
04/17/2022, 10:17 PMtest_param = Parameter('test_param', default="default_val")
function(test_param)
Where I got an AttributeError:
AttributeError: 'Parameter' object has no attribute
function(test_param.value)
to input just the string value of the parameter<Parameter: dbt_command>
). Why is there a different behaviour between the two tasks?
@task
def print_param(param):
<http://logger.info|logger.info>(param)
<http://logger.info|logger.info>(type(param))
with Flow("flow-name", run_config=RUN_CONFIG, storage=STORAGE) as flow:
dbt_command = Parameter('dbt_command', required=True)
print_param(dbt_command)
dbt_run = DbtShellTask(
command = dbt_command,
...,
dbt_kwargs={
...
},
)
Kevin Kho
04/18/2022, 2:08 AMKen Nguyen
04/18/2022, 5:17 AMKevin Kho
04/18/2022, 1:38 PMwith Flow("flow-name", run_config=RUN_CONFIG, storage=STORAGE) as flow:
dbt_command = Parameter('dbt_command', required=True)()
print_param(dbt_command)
The first parenthesis is the init and the second one is the run. The run will force it.Ken Nguyen
04/18/2022, 4:57 PMTask 'DbtShellTask': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/dbt/dbt.py", line 192, in run
return super(DbtShellTask, self).run(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/shell.py", line 131, in run
tmp.write(command.encode())
AttributeError: 'Parameter' object has no attribute 'encode'
Kevin Kho
04/18/2022, 5:02 PMKen Nguyen
04/18/2022, 5:21 PMKevin Kho
04/18/2022, 5:22 PMfrom prefect import task, Flow, Parameter
import prefect
@task
def abc(x):
<http://prefect.context.logger.info|prefect.context.logger.info>(x)
return x
with Flow("..") as flow:
test = Parameter("test", required=True)
abc(test)
flow.run(parameters={"test": 2})
Ken Nguyen
04/18/2022, 5:28 PMKevin Kho
04/18/2022, 5:30 PMKen Nguyen
04/18/2022, 5:42 PMKevin Kho
04/18/2022, 5:47 PMwith Flow("..") as flow:
test = Parameter("test", required=True)
DbtShellTask(__init__here)(..., test)
which will workKen Nguyen
04/18/2022, 6:02 PMKevin Kho
04/18/2022, 6:12 PM