https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • e

    egk

    04/14/2022, 5:39 PM
    Is it possible to return a Socket from a task?
    k
    • 2
    • 16
  • a

    Aric Huang

    04/14/2022, 6:19 PM
    Hi, I'm looking into running Prefect on Kubernetes clusters in different regions, and had a question about handling GCSResults. If we use the
    @task(result=GCSResult(bucket=<bucket>))
    method of configuring a task result, is the bucket path fixed at flow registration time? If so, is there a way it can be dynamically set at flow run time? What I'm hoping to do is have flows that can be registered to run on different clusters (using agent labels), and have their GCSResult bucket path be configured via an env var on the cluster. That way we can re-use the same flow code across different clusters but have different results buckets depending on the cluster.
    a
    k
    • 3
    • 9
  • j

    Jason

    04/14/2022, 6:48 PM
    Do you need to set the image on ECSRun for the task to use a a specific image or will it associate the Docker storage pushed to ECR in the same flow as the base image to use?
    k
    • 2
    • 4
  • p

    Philip MacMenamin

    04/14/2022, 6:50 PM
    Cryptic errors in ShellTask - Hi, I'm running a flow with a number of shellTasks, and encountering errors like:
    2022-04-14 09:42:06-0600] ERROR - prefect.TaskRunner | Task 'ShellTask[0]': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/blah/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/blah/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/blah/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
        return run_method(self, *args, **kwargs)
      File "/blah/python3.9/site-packages/prefect/tasks/shell.py", line 131, in run
        tmp.write(command.encode())
    AttributeError: 'list' object has no attribute 'encode'
    I have a couple of questions: • Is there a way to tag shell_tasks such that you can see some clue as to which one failed? • Can I get a better description of the failure At the moment I have
    shell_task = ShellTask(log_stderr=True, return_all=True, stream_output=True)
    k
    • 2
    • 17
  • e

    egk

    04/14/2022, 6:57 PM
    Is it a valid use-case to use Prefect for the sake of having a managed way of running tasks rather than "data pipelines"? I just want the workflow aspect of it.
    k
    • 2
    • 6
  • e

    egk

    04/14/2022, 7:02 PM
    Can I run Prefect fully on-premise without cloud features whatsoever?
    k
    p
    • 3
    • 6
  • j

    Jason

    04/14/2022, 7:16 PM
    So I have a Fargate ECS cluster, but when I try to scale up the cpu count with ECSRun() I get:
    [14 April 2022 2:14pm]: An error occurred (InvalidParameterException) when calling the RunTask operation: No Fargate configuration exists for given values.
    . The weird thing is that 4096 appears to be a valid entry: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-cpu-memory-error.html. Is it possible I screwed up the definition of my fargate cluster?
    k
    • 2
    • 6
  • j

    Jason

    04/14/2022, 8:13 PM
    What are the best practices for managing Docker image names?
    docker_storage = Docker(
        registry_url=environ["REGISTRY_URL"],
        dockerfile="./Dockerfile",
        image_name="{edited}-prod-platform-prefect-{project}",
        image_tag="latest",
    )
    It seems like each flow could demand its own image in order to separate dependencies, which would mean creating an ECR repo for each workflow? I suppose this wouldn't be that difficult to script with Github Actions and aws-cli globbing a directory for workflow names?
    k
    m
    • 3
    • 4
  • v

    Vipul

    04/14/2022, 8:29 PM
    Hey, quick question on Prefect Orion - Wanted to know if the caching on task based on the persistent result something going to be supported in the Prefect Orion. The one that is already available in the Prefect 1.0 with @task(target=“{task-name}“)
    k
    a
    • 3
    • 17
  • a

    Apoorva Desai

    04/14/2022, 9:07 PM
    Is there any documentation on which prefect docker tag has what extras?
    k
    • 2
    • 1
  • k

    Ken Nguyen

    04/14/2022, 11:28 PM
    Are there any example flows involving pulling from different branches of the same repo using mapping?
    k
    • 2
    • 78
  • a

    Alexander Butler

    04/14/2022, 11:55 PM
    prefect deployment create
    says it should create or update a deployment but it is failing when the deployment exists (prefect 2.0) ?
    k
    • 2
    • 20
  • r

    Ryan R

    04/15/2022, 12:11 AM
    Hi, Having some problems using getting around CORS issues with Prefect V2.0 accessible to another host on the lan while running in a docker container. Details in thread per @Kevin Kho
    k
    • 2
    • 13
  • a

    Apoorva Desai

    04/15/2022, 3:05 AM
    Hello! All my prefect flows are getting scheduled but not running. Any recommendations on where I can start to debug this?
    k
    • 2
    • 1
  • s

    Subhajit Roy

    04/15/2022, 4:42 AM
    Hi folks, in a prefect flow , for one particular task, I am trying to raise a SKIP signal
    from prefect.engine.signals import SKIP
    ........
    ........
    raise SKIP('Skipping all downstream dependencies.')
    With this all the following tasks are being skipped and was expecting the flow state will be
    skipped
    . Though the following tasks are being skipped but at the end the flow becomes
    successful
    . I have two questions around : 1. Is this something expected? 2. If this is expected, whats the remedy . Do I need to explicitly use a state handler on top of it to make the flow
    skipped
    eventually
    k
    • 2
    • 14
  • a

    Andrey Vinogradov

    04/15/2022, 9:14 AM
    Hi! Is it possible to increase max scheduled flows in prefect? I found “max_scheduled_runs_per_flow” parameter but it seems that is is only available in prefect server not prefect.
    a
    k
    • 3
    • 14
  • b

    Brett Naul

    04/15/2022, 1:06 PM
    we're still having issues with logs not loading, sounds like there was an incident yesterday but prefect.status.io says it should be resolved now? any known issues that are still being worked on?
    m
    c
    • 3
    • 16
  • p

    Patrick Tan

    04/15/2022, 2:09 PM
    Question on state_handler, is there anyway to pass arguments to state_handler? I want to update database table when flow or task fails. The arguments are database config information and credentials
    k
    • 2
    • 6
  • p

    Pedro Machado

    04/15/2022, 2:52 PM
    Hi everyone. Is there a migration guide that discusses the changes between 1.0 and Orion? I have a couple of implementations on 1.0 that we'd like to migrate soon after Orion hits GA. One runs on Azure Kubernetes and the other one on AWS ECS. We are using Docker storage for both. I was wondering what you recommend to understand how the different concepts relate in both versions. Thanks!
    👀 1
    k
    • 2
    • 1
  • d

    Domenico Di Gangi

    04/15/2022, 3:09 PM
    Hi all, I am wondering if its possible to change parameters of a deployed flow in the Orion UI and trigger a quick run with the new parameters. That would be extremely useful, for example in the process exploring different configurations and sizes when setting up third party cloud services
    k
    • 2
    • 3
  • a

    Ahmed Ezzat

    04/15/2022, 5:29 PM
    I'm having problems with tasks stuck at the "Running" state however when I inspected running workers everything seems running fine and results are submitted to my database although nothing is updating prefect state. currently using "prefect cloud" with
    prefect 1.2.0-python3.9
    docker image. same as https://github.com/PrefectHQ/prefect/issues/3952 for the dev team: https://cloud.prefect.io/bitthebyte/flow-run/b30223e1-5308-48fe-aa0b-9326c6e48860 (this is the stuck workflow) I already tried restarting
    k
    • 2
    • 11
  • m

    Melqui de Carvalho

    04/15/2022, 6:46 PM
    Hello guys, I would like to know if in Prefect 2.0 it is possible to request the execution of a specific task and all its dependent upstream tasks. Thanks.
    k
    • 2
    • 6
  • m

    Mohan kancherla

    04/15/2022, 7:41 PM
    Hello guys, we use VictorOps as our incident management tool and I setup cloudhook to the flow to send notifications to the Victorops as web URL. I am trying to test that out but it seems like, I am not receiving messages to our VictorOps. Does anyone has any idea about that?
    k
    m
    m
    • 4
    • 11
  • s

    sidravic

    04/16/2022, 8:59 AM
    Hello folks, I'm running flows on Prefect 1.2 using ECSRun and my setup uses the approach as described in this issue under the Conclusions pt. 5. It uses the
    task_definition_arn
    with the containers named as
    flow
    While I'm able to trigger the flows, the flow crashes with the error
    copilot/flow/8d31faa7f1ba   File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
    copilot/flow/8d31faa7f1ba     return _bootstrap._gcd_import(name[level:], package, level)
    copilot/flow/8d31faa7f1ba   File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
    copilot/flow/8d31faa7f1ba   File "<frozen importlib._bootstrap>", line 991, in _find_and_load
    copilot/flow/8d31faa7f1ba   File "<frozen importlib._bootstrap>", line 961, in _find_and_load_unlocked
    copilot/flow/8d31faa7f1ba   File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
    copilot/flow/8d31faa7f1ba   File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
    copilot/flow/8d31faa7f1ba   File "<frozen importlib._bootstrap>", line 991, in _find_and_load
    copilot/flow/8d31faa7f1ba   File "<frozen importlib._bootstrap>", line 973, in _find_and_load_unlocked
    copilot/flow/8d31faa7f1ba ModuleNotFoundError: No module named '/root/'
    However, i've ensured the flows folder from my project are under the PYTHONPATH and I can't entirely figure out what (if anything) that cloudpickle is trying to do to access those flows at the time of execution.
    a
    • 2
    • 7
  • h

    Hash Lin

    04/16/2022, 2:28 PM
    Hi, I'm new to Prefect. It's fantastic to deploy pipelines by using Python code. However, I had faced some issues when deploying a pipeline to Prefect. Here's my scenario. • I want to run Prefect Agent in the K8s cluster, and I successfully plug the agent into our cluster. • I built a docker image in Gitlab CI and pushed the docker image to ECR. Then, I wrote the Dockerfile and reference from here. • I write a flow script and register to the Prefect cloud by using my local computer. Unfortunately, after running the pipeline, I got some errors: the python model was not found. As a result, I can't successfully import the module that I wrote in the project. And here's my flow script.
    Failed to load and execute flow run: ModuleNotFoundError("No module named '/Users/xxx/'")
    Thanks for helping. 🙇
    k
    t
    m
    • 4
    • 29
  • b

    Blake

    04/17/2022, 1:16 AM
    Hiya! Quick question on best practices in Prefect 1. With a flow-of-flows, is there an ideal way to pass values down to child flows? In particular, secrets? Parameters seem fine for everything else.
    k
    • 2
    • 6
  • m

    MasatoShima

    04/17/2022, 7:20 AM
    Hi, I’m trying Prefect Orion. I am planning to use Amazon S3 bucket for Storage. I wrote the following code. However, I get an error. I have not created the Storage yet… Am I doing something wrong? I’d be glad to hear from you if anyone has any ideas!
    s3_storage_block = S3StorageBlock(
        bucket="********",
        profile_name="default",
        region_name="ap-northeast-1",
    )
    
    async with get_client() as client:
        block_id = await client.create_block(
            name="********",
            block=s3_storage_block,
            block_spec_id=uuid.UUID("{12345678-1234-5678-1234-567891234567}")
        )
    a
    • 2
    • 3
  • k

    Ken Nguyen

    04/17/2022, 7:21 PM
    If I have 2 parameters, one as a list to be mapped, and one as a value that should be unmapped, how can I concat the mapped parameters onto the unmapped one?
    k
    a
    • 3
    • 4
  • a

    Alexander Butler

    04/17/2022, 8:52 PM
    So I have this deployment in Prefect 2.0:
    DeploymentSpec(
        flow_location=str((FLOW_DIR / "salesforce.py").absolute()),
        flow_name="elt-salesforce",
        name="sf-production-elt-job",
        schedule=IntervalSchedule(interval=timedelta(hours=1)),
        tags=["pipeline"],
        flow_runner=DockerFlowRunner(image=f"{IMAGE_REPO}/{DBT_IMAGE}:{TAG}", stream_output=True)
    )
    And it took awhile to come to me as a requirement but I essentially have 2 steps. Step one requires docker image A to do some data pipeline stuff, step 2 needs my custom dbt docker image B to do some transform AFTER step 1. So these two dependent tasks constitute one flow with each step on independent docker images. A flow runner is configured at a deployment level but I dont see a way to configure it at the task or subflow level. Definitely a key req in current state. Please help!
    👀 1
    k
    m
    a
    • 4
    • 6
  • k

    Ken Nguyen

    04/17/2022, 10:17 PM
    How do I get the value of the input of my parameters? I was doing something like this in my flow:
    test_param = Parameter('test_param', default="default_val")
    function(test_param)
    Where I got an AttributeError:
    AttributeError: 'Parameter' object has no attribute
    k
    • 2
    • 23
Powered by Linen
Title
k

Ken Nguyen

04/17/2022, 10:17 PM
How do I get the value of the input of my parameters? I was doing something like this in my flow:
test_param = Parameter('test_param', default="default_val")
function(test_param)
Where I got an AttributeError:
AttributeError: 'Parameter' object has no attribute
I believe I would like to do something like
function(test_param.value)
to input just the string value of the parameter
A bit puzzled by this. With the below code example, the parameter going into print_param is supposedly the string value that I entered (output of that function is the string + the type is a str type). But with the DbtShellTask, I believe it’s passing in the parameter object (
<Parameter: dbt_command>
). Why is there a different behaviour between the two tasks?
@task
def print_param(param):
    <http://logger.info|logger.info>(param)
    <http://logger.info|logger.info>(type(param))

with Flow("flow-name", run_config=RUN_CONFIG, storage=STORAGE) as flow:
    dbt_command = Parameter('dbt_command', required=True)
    print_param(dbt_command)
    
    dbt_run = DbtShellTask(
        command = dbt_command,
        ...,
        dbt_kwargs={
            ...
        },
    )
k

Kevin Kho

04/18/2022, 2:08 AM
Parameters are JSONSerialized because they go through the API so they don’t accept Python classes. Does that make sense? So only basic Python types can be accepted. Prefect 2.0 will coerce it if you provide a schema for it
k

Ken Nguyen

04/18/2022, 5:17 AM
I think I understand what you’re saying, but I don’t understand why that would lead to parameters being different a different object type when passed onto different tasks. I guess more confusing is how the parameter seemingly remains a parameter object when passed into DbtShellTask.
k

Kevin Kho

04/18/2022, 1:38 PM
Ah ok I know that you mean. Do this?
with Flow("flow-name", run_config=RUN_CONFIG, storage=STORAGE) as flow:
    dbt_command = Parameter('dbt_command', required=True)()
    print_param(dbt_command)
The first parenthesis is the init and the second one is the run. The run will force it.
k

Ken Nguyen

04/18/2022, 4:57 PM
I tried that and it seems like dbt_command is still being passed to DbtShellTask as a parameter object based on the below errror message:
Task 'DbtShellTask': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks/dbt/dbt.py", line 192, in run
    return super(DbtShellTask, self).run(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks/shell.py", line 131, in run
    tmp.write(command.encode())
AttributeError: 'Parameter' object has no attribute 'encode'
k

Kevin Kho

04/18/2022, 5:02 PM
That’s weird I can test in a bit
Ok i re-read. Why are you using a function inside the Flow to get the param value? Functions will execute immediately before the param has a value during Flow registration time. It needs to be a task to defer the execution
The Parameter value doesn’t exist in flow registration time
k

Ken Nguyen

04/18/2022, 5:21 PM
Sorry my initial message said functio, but it’s actually a task. To be specific, it’s the DbtShellTask
Wait, that IS a task right?
k

Kevin Kho

04/18/2022, 5:22 PM
Ah yeah. Task should just work off the shelf tho?
from prefect import task, Flow, Parameter
import prefect 

@task
def abc(x):
    <http://prefect.context.logger.info|prefect.context.logger.info>(x)
    return x

with Flow("..") as flow:
    test = Parameter("test", required=True)
    abc(test)

flow.run(parameters={"test": 2})
Ohh I understand
DbtShellTask has an init and a run. You are passing the parameter to the init so it’s evaluated in build time. Only the run is deferred
k

Ken Nguyen

04/18/2022, 5:28 PM
Ohhh
What can I do to prevent that?
k

Kevin Kho

04/18/2022, 5:30 PM
Only stuff in the run method can be parameterized
k

Ken Nguyen

04/18/2022, 5:42 PM
Can I create a task that takes in the parameter and outputs it again, then feed that task output into DbtShellTask? Does that defer the parameter to run time?
k

Kevin Kho

04/18/2022, 5:47 PM
But that’s effectively just passing it into the run right?
with Flow("..") as flow:
    test = Parameter("test", required=True)
    DbtShellTask(__init__here)(..., test)
which will work
k

Ken Nguyen

04/18/2022, 6:02 PM
OH
I see!
Okay, thank you for your help, I actually understood the root cause of the issue and learnt a lot more about prefect!
k

Kevin Kho

04/18/2022, 6:12 PM
Nice work!
🙌 1
View count: 4