https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Mary Clair Thompson

    10/02/2020, 2:11 PM
    Does anyone have experience running the server on a machine managed by puppet? I'm running into an issue where seemingly unrelated puppet updates are breaking the back end.
    d
    • 2
    • 64
  • j

    Jacques

    10/02/2020, 2:27 PM
    We are using prefect core and now trying to create a new
    Environment
    - should calling
    flow.run()
    use the
    Executor
    defined in the env if I declared the flow with something like
    Flow("myflow", environment=MyEnvironment())
    - or is that only used by prefect agents?
    d
    j
    • 3
    • 12
  • r

    Ralph Willgoss

    10/02/2020, 2:55 PM
    Hi I was wondering if someone can help me with a prefect logging question. I've got logs working and have configured an additional logger using config.toml 1. How do I suppress prefect logs to say WARN level and have mine at INFO level? right now the logging level controls all logs 2. I currently have a weird behaviour where I see some other logging logging statements of mine: notice the duplication - i didnt specify the formatting for the 2nd bottom log
    [2020-10-02 14:32:01] INFO - (13104) task_runner.run | Task 'calculate_payoffs': finished task run for task with final state: 'Success'
    INFO:prefect.TaskRunner:Task 'calculate_payoffs': finished task run for task with final state: 'Success'
    d
    • 2
    • 20
  • h

    Hui Zheng

    10/02/2020, 4:43 PM
    Hello, I have a question about
    prefect context
    . It seems a powerful way to share common variables and objects among tasks. Is it possible to add new objects or variables inside the flow definition which then to be used by the tasks later? https://docs.prefect.io/core/concepts/execution.html#modifying-context-at-runtime such as
    with Flow('Flow with new context') as flow:
        # Could we add new objects to prefect.context here?
    
        task_1_use_new_context_object()
    d
    • 2
    • 4
  • h

    Hammad A

    10/02/2020, 4:55 PM
    Why does it appear to be super difficult to execute each task of a flow in a separate docker container?
    d
    j
    s
    • 4
    • 33
  • v

    Vincent

    10/02/2020, 6:19 PM
    I am trying to understand the differences between the TaskRunner and the CloudTaskRunner . It seems that these two Runners schedule tasks a slightly differently. ie. I have a flow that runs in parallel Locally, but fails to run in parallel when using the DaskKubernetesEnvironment. Is there a summary of the differences in scheduling ?
    c
    • 2
    • 6
  • b

    Brett Naul

    10/02/2020, 6:31 PM
    fyi just noticed a few dead doc links to https://docs.prefect.io/core/task_library/ still sprinkled around
    d
    • 2
    • 1
  • k

    Kyle Pierce

    10/02/2020, 6:46 PM
    I have a task that take two parameters and outputs a list. Whenever i try to register it it gives me this error
    TypeError: 'FunctionTask' object is not iterable
    I dont know if im over complicating this.
    👀 1
    d
    • 2
    • 17
  • e

    Eric

    10/04/2020, 6:28 PM
    Any thoughts on this error? AttributeError: 'FunctionTask' object has no attribute task_run_name. Did you call this object within a function that should have been decorated with @prefect.task? This is how I'm setting up the task dectorator
    @task(name="DF", log_stdout=True)
    c
    • 2
    • 1
  • m

    ms16

    10/05/2020, 12:20 AM
    HI all How do I get the prefect cli auto-complete in zsh or bash?
  • m

    Max Lei

    10/05/2020, 12:45 AM
    Hi All, How do I include default loggers to be logged in the prefect UI? Right now I have a function that setup logs, for example
    def setup_logs():
        # Example config
        'root': {
                'level': 'DEBUG',
                'handlers': [
                    'console',
                    'info_file_handler',
                    'fluent'
                ]
         }
         logging.config.dictConfig(logging_config)
    
    class DoSomething():
        def run(self):
            setup_logs()
            <http://logger.info|logger.info>("Here")
    The logging configurations sets up a default logger to stdout, a file handler, and also to a 3rd party log streaming service using fluent. I see the local file output, but not through the prefect UI.
    export PREFECT__LOGGING__EXTRA_LOGGERS="['root']"
    
    [logging]
    extra_loggers = "['root']"
    I have setup a toml file, and tried to set the environmental variable before running the agent, and the flow itself. However I still do not see the logs in the prefect UI. Is there something else I need to do?
    c
    • 2
    • 1
  • n

    Nakul Gowdra

    10/05/2020, 2:31 AM
    Hi Team, Has anyone encountered issues running flows on prefect cloud, the flow runs perfectly locally ( on windows pc) but fails with "Failed to load and execute Flow's environment: TypeError('an integer is required (got type bytes)')" . I have tried to register via flow.register() as well as prefect cmd. Thanks in advance - BTW it is using a boto3 aws library and python 3.7.2
    • 1
    • 1
  • n

    Newskooler

    10/05/2020, 10:24 AM
    Hi Prefect community ! 👋 Could someone (or more) please share any best practices in regards to registering a flow? If I have my code in a repository and then this repository deployed to a server where prefect is running (server and agent) what is an elegant way of registering a flow? currently i need to open some kind of python (ipython or notebook) and register manually each flow (and flow version). Surely there must be a more elegant way?: )
    r
    j
    j
    • 4
    • 18
  • r

    Robin

    10/05/2020, 11:52 AM
    Is there a possibility to access variables that were defined in tasks in the state handler? For example, in each task of a mapped task, I want to make a notification about the state change. And in that notication I would like to add a system id that is different for each task instance of the mapped task. Does somebody know how to access a variable from the task within the state handler?
    k
    • 2
    • 5
  • s

    Sven Teresniak

    10/05/2020, 12:14 PM
    Is there any way I can identify a task-run by some runtime information? I need to query task-runs by information that is only available during runtime, based on
    Parameter
    . I cannot use slug or tags because I cannot set them to parameter values (or can I?). I still work on a Lock-like ResourceManager but thats very difficult when it comes to scheduling-/parameter-dependent locking. Creating a Lock on constants (e.g. a constant string) seems rather easy. What I need (and trying to build) is tag concurrency limits for the standalone version. :)
    k
    • 2
    • 30
  • a

    ale

    10/05/2020, 12:18 PM
    Hi folks, we’re configuring Prefect to run tasks with Fargate Agent. Is there a way to tell Prefect to create task definitions with a predefined naming convention instead of flow_name along with tags flow_id and flow_version?
    j
    • 2
    • 1
  • i

    itay livni

    10/05/2020, 12:27 PM
    Hi - Is there an example or documentation for naming mapped tasks? I could not find it 🙃. Thanks
    j
    • 2
    • 2
  • a

    ale

    10/05/2020, 1:49 PM
    Hey folks, finally I was able to configure Fargate Agent. Now I want to run a task on Fargate. I’m using the following config:
    FargateTaskEnvironment(
        taskRoleArn=ETL_TASK_ROLE_ARN,  # ARN of the task role
        executionRoleArn=ETL_EXECUTION_ROLE_ARN  #  ARN of the execution role
    )
    But I get the following error back from the Agent:
    An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Fargate requires task definition to have execution role ARN to support ECR images
    Any suggestions?
    s
    g
    • 3
    • 37
  • h

    Hui Zheng

    10/05/2020, 5:28 PM
    Hello, I have a question about flow container docker registry. Could the docker container be registered with multiple image-tags? https://docs.prefect.io/api/latest/environments/storage.html#docker
    k
    • 2
    • 2
  • k

    Kyle Pierce

    10/05/2020, 6:10 PM
    If I want to make a task reusable across flows do i need to make it into a package or is there someway to access modules in other files? I looked on the documentation there isnt anything the explains how to reuse tasks.
    k
    • 2
    • 2
  • t

    Tenzin Choedak

    10/05/2020, 6:38 PM
    Hi prefect community and prefect team 👋 Is there any desire for a builtin prefect task for kafka flows? At my org, within the data engineering team we hardly ever run live producers and consumers - those are typically maintained and developed by the application team who need that sub-second responsiveness to events. However, we do own batch consumers and producers - either to pickup a batch of new events from the application stack or to push a batch of updates to the application stack. It seems possible to at least design a batch consume + batch produce task for a kafka integration within prefect. How do others and the prefect team feel about that?
    k
    • 2
    • 6
  • n

    Newskooler

    10/05/2020, 7:15 PM
    Hi 👋 Can someone help with this question please? https://stackoverflow.com/questions/64208540/in-prefect-can-a-task-value-be-cached-for-the-duration-of-the-flow-run
    k
    • 2
    • 1
  • h

    Hui Zheng

    10/05/2020, 11:05 PM
    hello, I changed the name of one my task from
    get_timestamp()
    to
    init_flow_run()
    . It runs fine with docker storage locally, but after I pushed the new flow to the prefect-cloud, I encountered this error
    Unexpected error while running flow: KeyError('Task slug init_flow_run-1 not found in the current Flow; this is usually caused by changing the Flow without reregistering it with the Prefect API.')
    k
    • 2
    • 6
  • b

    Bridget Pelletier-Ross

    10/06/2020, 5:37 AM
    Hi all! 👋 can anyone here tell me if there’s a security review of prefect I can look at? I’m considering using core in production at my company. Thank you!
    👍 1
    c
    • 2
    • 1
  • g

    gunarevuri

    10/06/2020, 6:15 AM
    Hi guys, i was new to prefect, i ran a very basic code flow, and then i want to see it in UI, but i am getting error like this, "ValueError: Project my_first_flow not found. Run
    client.create_project("my_first_flow")
    to create it." And here is my basic code
    from prefect import task, Flow
    @task def hello_world(): return "hello prefect" @task def prefect_say(s:str): print(s) with Flow("my_first_flow") as f: r = hello_world() s = prefect_say(r) # f.run() f.visualize() f.register("my_first_flow")
    j
    • 2
    • 1
  • a

    Alfie

    10/06/2020, 11:03 AM
    Hi team, i do not see delete flow in webhook storage, does that mean I need to manage the deletion by myself? Thanks
    j
    • 2
    • 5
  • d

    Darren Forsdike

    10/06/2020, 12:29 PM
    Hi, Im new to prefect , looking for a scheduler to run various docker images in a private registry at scheduled times and some other tasks in the future , is that something that is pretty easy to configure ?
    j
    • 2
    • 2
  • j

    Jacques

    10/06/2020, 2:04 PM
    We used to use
    LocalDaskExecutor(scheduler="threads")
    for running one-off flows, then started having issues with tasks being repeated and was recommended we switch to
    DaskExecutor
    , which we've been using with
    DaskExecutor(cluster_kwargs = {"processes": False})
    . We've now run into some new bugs with latest prefect (a side note, but here: https://github.com/PrefectHQ/prefect/issues/3443) - trying to understand what the difference is between
    LocalDaskExecutor
    and
    DaskExecutor
    using threads and a temporary cluster? Is there a performance/reliability advantage of using one over the other?
    j
    • 2
    • 2
  • b

    Benjamin Filippi

    10/06/2020, 3:53 PM
    Hi guys, there a seemingly simple thing I can’t manage to do...... I want to cascade mappings: I have two lists: L1=[A,B] L2=[1,2,3] I want to generate 6 tasks, expected execution plan is as follow: Execute (A,1) (A,2) (A,3) (B,1)(B,2)(B,3) Reduce A from first 3 reduce B from last 3 Reduce (A,B) What simple trick am I missing?
    j
    • 2
    • 2
  • m

    Mitchell Bregman

    10/06/2020, 4:27 PM
    Hi there, We are using Prefect Cloud to handle our workflow management. In the process of standing it up, our deployment recipe uses a Kubernetes Agent and Docker Storage. When registering a flow to cloud, without specifying an environment, the flow is submitted for execution, and runs entirely fine. However, for long running map tasks, we’d like to consider using
    DaskKubernetesEnvironment
    . Perhaps, we can use
    DaskKubernetesEnvironment
    for all of our flows. Upon registering a new flow, as shown below, to the cloud and submitting a Quick Run, we get the error: 
    Kubernetes Error: Back-off pulling image
    When not specifying the
    DaskKubernetesEnvironment
    all registering, deploying, flow execution works just fine. Here is a sample flow that I am trying to use Dask for:
    with Flow("test-flow") as flow:
      numbers = numbers_task()
      first_map = map_task.map(numbers)
      second_map = map_task.map(first_map)
      reduction = reduce_task(second_map)
    
    flow.storage = Docker(
      registry_url="<http://parkmobile-docker.jfrog.io|parkmobile-docker.jfrog.io>",
      image_name="test-flow",
      image_tag="0.0.1"
    )
    
    flow.environment = DaskKubernetesEnvironment(min_workers=2, max_workers=4)
    
    flow.register("test")
    Any ideas as to why the DaskKubernetesEnvironment is throwing off the flow execution?
    c
    • 2
    • 11
Powered by Linen
Title
m

Mitchell Bregman

10/06/2020, 4:27 PM
Hi there, We are using Prefect Cloud to handle our workflow management. In the process of standing it up, our deployment recipe uses a Kubernetes Agent and Docker Storage. When registering a flow to cloud, without specifying an environment, the flow is submitted for execution, and runs entirely fine. However, for long running map tasks, we’d like to consider using
DaskKubernetesEnvironment
. Perhaps, we can use
DaskKubernetesEnvironment
for all of our flows. Upon registering a new flow, as shown below, to the cloud and submitting a Quick Run, we get the error: 
Kubernetes Error: Back-off pulling image
When not specifying the
DaskKubernetesEnvironment
all registering, deploying, flow execution works just fine. Here is a sample flow that I am trying to use Dask for:
with Flow("test-flow") as flow:
  numbers = numbers_task()
  first_map = map_task.map(numbers)
  second_map = map_task.map(first_map)
  reduction = reduce_task(second_map)

flow.storage = Docker(
  registry_url="<http://parkmobile-docker.jfrog.io|parkmobile-docker.jfrog.io>",
  image_name="test-flow",
  image_tag="0.0.1"
)

flow.environment = DaskKubernetesEnvironment(min_workers=2, max_workers=4)

flow.register("test")
Any ideas as to why the DaskKubernetesEnvironment is throwing off the flow execution?
c

Chris White

10/06/2020, 4:37 PM
Hi Mitchell - would you mind moving this code snippet into the thread? It’s large so it takes up the whole chat window. Otherwise, how is your agent authenticating with your Docker registry? If it’s through a K8s secret, you probably need to mount the same secret to your daskK8s environment (There should be a keyword argument for that)
m

Mitchell Bregman

10/06/2020, 4:49 PM
So you are saying that there needs to be a separate daskk8 environment with a separate agent?
I figured I could use the single Kubernetes Agent i have stood up, would spawn off some sort of Dask environment for me, and then execute there
Let me know if I’m thinking about it incorrectly
c

Chris White

10/06/2020, 4:51 PM
No, nothing that extreme; the DaskKubernetesEnvironment requires the ability to create new K8s jobs with your Flow’s Docker image, which requires that it have the ability to pull from your docker registry. Typically, docker registries are authenticated via K8s secrets (let me know if you use something different). Check out the docs here that discuss the
image_pull_secret
kwarg: https://docs.prefect.io/api/latest/environments/execution.html#daskkubernetesenvironment
m

Mitchell Bregman

10/06/2020, 4:54 PM
@Kenan Alptekin let me know if you have some additional context, but i believe we authenticated via
prefect agent start kubernetes --env IMAGE_PULL_SECRETS=my-img-pull-secret
c

Chris White

10/06/2020, 4:55 PM
Perfect, so then that doc explains how to pass that same secret onto your Dask K8s Environment by name via the
image_pull_secret
kwarg
m

Mitchell Bregman

10/06/2020, 4:56 PM
I see, so this would need to be passed via
kwarg
Ideally, i was hoping that
--env IMAGE_PULL_SECRETS=my-img-pull-secret
would handle the rest
but my knowledge of how k8s work is quite minimal.. will touch back any findings!
c

Chris White

10/06/2020, 4:57 PM
yup, that’s accurate. Environments are getting a large overhaul in the coming weeks, so these sorts of issues will become much more straightforward to debug
🙌 1
View count: 1