https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Avi A

    05/31/2020, 2:18 PM
    Core UI - how do I stop a running flow? I couldn’t find any reference to it, and the only way for me to do this right now is to hard kill the agent itself. Any way of doing so from the UI / CLI?
    n
    j
    • 3
    • 6
  • m

    Max Lei

    05/31/2020, 11:08 PM
    Hi all, is there a recommended approach for objects that cannot be pickled? For example in a machine learning pipeline where you need to do some ETL. Some of the steps require tools that have objects that cannot be pickled so I imagine I might put all those items in one step (SFrames, Spark, etc). Then pipe the results to a CSV file, and the next step we can train a model using the file location of the CSV file, but then I may want one more step to create metrics and send the metrics to MLFlow, but it may be possible that the model cannot be pickled easily, is prefect the wrong tool for this job? But I also would like a cronjob and a UI to look at the results. It seems that I may need to engineer the problem around Prefect vs a single script and Linux crontab. Any suggestions?
    j
    a
    s
    • 4
    • 4
  • r

    Richard Pitre

    06/01/2020, 12:04 AM
    I have the same task running at different node of my graph. Now I have a task that must run after one of them is finish. How can I setup this dependency. I am using the Flow framework
    j
    • 2
    • 1
  • p

    philip

    06/01/2020, 9:09 AM
    I want to get my Parameter at state_handler, It is OK on local test. But it have error on prefect server like this:
    Exception raised while calling state handlers: KeyError(<Parameter: fisrt_num>,)
    how can i solve this problem? Here is my code
    @task
    def add(x, y):
        return x+y
    
    def post_to_slack(task, old_state, new_state):
        if new_state.is_failed():
            msg = "Task {0} finished in state {1}".format(task, new_state)
            
        if new_state.is_finished():
            p = new_state.result[num].result
            print(p)
            
        return new_state
    
    with Flow("Tutorial", state_handlers=[post_to_slack]) as flow:
        x = Parameter("fisrt_num")
        y = Parameter("Second_num")
    
        num = add(x,y)
    c
    • 2
    • 2
  • a

    Arsenii

    06/01/2020, 11:01 AM
    A kind of a general question to the dev team: are there any plans to allow more granular control over flow execution rights? For example, some of the flows I made are exclusively run automatically by a script (through
    client.create_flow_run(_version_group_id_=...)
    and there's a chance to screw up some important data if any user tries to run it manually. Another use case I can think of is letting certain members of a team access only specific projects. Thanks!
    j
    • 2
    • 1
  • a

    Arsenii

    06/01/2020, 11:08 AM
    An unrelated technical question: Currently I'm running DockerAgent on an AWS EC2 server, and it's pulling flow images from an ECR. On that EC2, we need to run
    $(aws ecr get-login --region redacted-region --registry-ids 1234567890 --no-include-email)
    in order to generate an ECR docker authorization token, and login -- this is standard practice. However! This token is only valid for 12 hours, so the command above is re-run automatically. Now the weird thing is, Prefect Agent doesn't actually "refresh" the auth token and gives me an "Your authorization token has expired" error, even though the command above was run. The only thing that helps is re-starting the Agent, at which point it starts using the new token again. This seems to be some king of a bug, since the configuration of the docker server should not concern the Agent -- or am I missing out on some configuration? Thanks
    j
    • 2
    • 1
  • s

    Sumant Agnihotri

    06/01/2020, 12:33 PM
    Hi, quick question, Is it fine to use
    if
    inside flow like this:
    def flow_func(ip):
        with Flow('flow func') as flow:
            task1()
            if ip == True:
                task2()
    e
    s
    j
    • 4
    • 6
  • h

    Howard Cornwell

    06/01/2020, 2:34 PM
    Hey, a question about secrets. The documentation appears to suggest that setting & retrieving secrets is disabled on a self-hosted server? I appreciated the
    cloud
    tags throughout the documentation, but I think you’re missing one on the linked header.
    c
    • 2
    • 1
  • k

    kelv

    06/01/2020, 5:02 PM
    Hi all! I'm looking into a single tool to unify data flows and Prefect looks interesting! From the installation docs I can see it's simply a
    pip install
    . I am curious what high-availability options there are in case the master/scheduling/coordinator(?) node fails, and where the configurations/flows are stored that one can restore from.
    k
    • 2
    • 1
  • k

    Kesav Kolla

    06/01/2020, 6:15 PM
    I've couple of questions would like to know how will perfect addresses these. • Ability to add human workflow. Basically want to inject an approval/reject or some data entry operations by human. Is it possible to achieve that? • Is it possible to use Apache Arrow to pass data between tasks? Apache Arrow seems to be an interesting data reprasentation where by we can avoid extra serialization/deserialization
    k
    j
    • 3
    • 4
  • w

    Will Milner

    06/01/2020, 6:40 PM
    is the only way to register a flow by calling the register method on the flow on the same server prefect is running on?
    c
    • 2
    • 4
  • j

    Jared

    06/01/2020, 6:52 PM
    Testing out Prefect as an alternative for an enterprise deployment and running into an odd issue with scheduled flows. For some reason I can only get
    IntervalSchedule
    tasks to run 10 times (this is using Core/not on cloud). Am I missing something obvious? When I initially register the flow, 10 runs get queued and executed, but no more. In the terminal running the server, the scheduler wakes, schedules 0 runs, and sleeps even if all 10 original runs have passed. On-demand runs in the UI still work at this point. I'll comment with what I'm doing to reproduce.
    k
    • 2
    • 5
  • k

    Kesav Kolla

    06/01/2020, 7:09 PM
    I see the perfect tasks are not long running. Eg: the database fetch task it opens connection executes query and closes connection. Is it possible to develop a task which runs forever. I can write a PostgreSQL Task which initializes connection pool and it's open forever. All subsequent query executions will re-use connection from pool. Another use case is a gRPC or Arrow Flight IPC server which runs forever and will serve other tasks the necessary data. Something similar to zeebe or netflix conductor provides.
    c
    • 2
    • 1
  • m

    Matthias

    06/01/2020, 8:55 PM
    Dear Prefect Community 👋 I’d like to make a Kubernetes secret (containing GCP credentials) available in the worker pods created by the Kubernetes agent. Is this possible?
    👋 2
    k
    m
    • 3
    • 5
  • w

    Will Milner

    06/02/2020, 3:13 PM
    I have a question about securing Prefect. Right now I have a prefect server deployed to an aws instance. I expose port 8080 to grant access to the server, but it's also required that the graphql port 4200 is also exposed due to the client side javascript. However I don't want graphql to be exposed since that allows the user to send arbitrary commands to the prefect server. Is there anyway to just give access to the web UI, but not allow access to graphql?
    z
    d
    • 3
    • 8
  • j

    jorwoods

    06/02/2020, 5:37 PM
    What is necessary to get Prefect to use checkpointing? It seems there is something missing from my simplified test code, but unclear what:
    from prefect import Flow, task, unmapped, Parameter
    from prefect.engine.results import LocalResult
    from prefect.engine.executors import LocalDaskExecutor
    import prefect
    
    lr = LocalResult(location='{flow_name}-{task_name}-{map_index}.txt')
    
    @task(log_stdout=True, checkpoint=True,)
    def add(x, y):
        return x + y
    
    with Flow('iterated map', result=lr) as flow:
        y = unmapped(Parameter('y', default=10))
        mapped_result = add.map([1, 2, 3], y=y)
    
    flow.run(executor=LocalDaskExecutor())
    c
    • 2
    • 2
  • j

    jorwoods

    06/02/2020, 9:01 PM
    One more question around checkpointing:
    target (str, optional): location to check for task Result. If a result
                exists at that location then the task run will enter a cached state.
                `target` strings can be templated formatting strings which will be
                formatted at runtime with values from `prefect.context`
    If I have a mapped task, and I want it to create separate outputs per map, I see
    map_index
    in
    prefect.context
    , but that relies on my arguments being in the same order each time, correct? Is there a way to pass Parameter values into this
    target
    or the result's location kwarg such that when I look at the directory containing my checkpoint files, I quickly know which ones have completed?
    z
    m
    • 3
    • 6
  • c

    Crawford Collins

    06/02/2020, 11:40 PM
    Hi I'm trying to do a lot of work with mapped functions. How do i merge the results of 2 tasks together? For example
    imputed_categorical_dfs
    returns two objects. I need to merge these with another task which does the same.
    My code which does not work.
    transformed_df_map = merge_transformed_data.map(
            df1=[*imputed_categorical_dfs, *encoded_df_map],
            df2=[*imputed_numeric_dfs] + [*yeo_johnson_dfs],
        )
    z
    • 2
    • 3
  • b

    Ben Davison

    06/03/2020, 10:01 AM
    Hiya, for prefect server. Are there any statsd metrics that can be hooked into? Or any events we can use for monitoring prefect internally.
    n
    • 2
    • 1
  • a

    Avi A

    06/03/2020, 12:20 PM
    Hey there, I have a problem with prefect server. My setup used to work but now there might be something wrong with the version or something. I’m getting this error in the UI when running a flow:
    [3:18pm]: Exception raised while calling state handlers: HTTPError('400 Client Error: Bad Request for url: <http://localhost:4200/graphql/alpha>')
    Any idea on how to start debugging?
    n
    j
    • 3
    • 37
  • r

    Rodrigo Neves

    06/03/2020, 1:27 PM
    Hello guys, first time here and recent user of prefect. First of all I need to say it that so far I’m loving it, keep with the awesome work!! I need help to compose a flow with several task with map. Currently I can do it, but each task is only processed after all task from last step are done. This is sub-optimal since each branch of the pipeline(each .map()) are independent of each other. There is any way of achieving that, out-of-the-box??
    def run_flow_step_0_map(self, df):        
            cols = list(df.items())
            with Flow("pipeline_map") as flow:
                col = self.get_data.map(unmapped(self), cols)
                col = self.task1.map(unmapped(self), col)
                col = self.task2.map(unmapped(self), col)
                col = self.task3.map(unmapped(self), col)
                col = self.task4.map(unmapped(self), col)
                col = self.task5.map(unmapped(self), col)
                result = self.task5.map(unmapped(self), col)
    
            return flow
    (is confusing the explanation, if you need extra info just say it)
    👍 1
    n
    j
    • 3
    • 18
  • s

    Simone Cittadini

    06/03/2020, 2:59 PM
    Hi! I'm looking at the code and can't find my way around, can you kindly point me to the place where Flows are run on the server ? I use eliot for logs, and I'd like flow runs to live in an action context. Easily done locally overriding the run method, but I can't find my way around server code ( coming from airflow, love your job, it's a lifesaver ! )
    👀 1
    n
    • 2
    • 4
  • r

    Radu

    06/03/2020, 3:09 PM
    Hello everyone! I just joined this group, as I was assigned to do a POC with Prefect 🙂 I hope I'll get useful information reading through these channels. Cheers!
    😄 2
    :upvote: 4
    n
    j
    • 3
    • 2
  • m

    Mary Clair Thompson

    06/03/2020, 3:10 PM
    Hi folks! I'm running prefect on a vm and having issues when the box reboots. The docker containers and prefect server restart just fine, but the agent doesn't--so scheduled jobs don't start. What's the best way to deal with this?
    a
    n
    • 3
    • 3
  • n

    Nelson

    06/03/2020, 3:40 PM
    Hi everyone! We’re trialling Prefect (so far good docs and user experience), I’m just overwhelmed when going from a single local environment to a multi-environment distributed setup (this will help a lot https://github.com/PrefectHQ/prefect/issues/2508#issue-613607085 !). For parametrising resources, I tried the below
    config.toml
    to have different buckets per env:
    source_data_bucket = "${environments.${environment}.source_data_bucket}"
    transient_data_bucket = "${environments.${environment}.transient_data_bucket}"
    
    [environments]
    
        [environments.dev-nelson]
            source_data_bucket = "<s3://REDACTED>"
            transient_data_bucket = "<s3://REDACTED>"
    
        [environments.prod]
            source_data_bucket = "<s3://REDACTED>"
            transient_data_bucket = "<s3://REDACTED>"
    I can print the
    prefect.config.transient_data_bucket
    inside a task, but when used as
    S3Result(bucket=prefect.config.transient_data_bucket)
    it fails with
    Invalid bucket name ""
    . How are others doing this? Note I’m providing this result as a task config
    @task(
        target="{date:%Y-%m-%d}/crunchbase-raw.cloudpickle",
        result=S3Result(bucket=prefect.config.transient_data_bucket)
    n
    c
    • 3
    • 11
  • j

    John Ramirez

    06/03/2020, 3:43 PM
    hey everyone - I need to connect to a postgres DB through a SSH tunnel. Is there a native way to do this with the
    Postgres
    task or do I need to make something custom
    👀 1
    n
    • 2
    • 1
  • d

    Darragh

    06/03/2020, 3:51 PM
    Hey guys, having 2 problems trying to use FargateTaskEnviornment with FargateAgent. The docs page confused me ever so slightly, so I’m not 100% sure I have it all configured correctly. flow.environment is configured as per the following snippet, and builds and registers with my server:
    flow.environment = FargateTaskEnvironment(
            launch_type="FARGATE",
            region="eu-west-1",
            cpu="256",
            memory="512",
            networkConfiguration={
                "awsvpcConfiguration": {
                    "assignPublicIp": "ENABLED",
                    "subnets": ["subnet-X"],
                    "securityGroups": ["sg-Y"],
                }
            },
            family="my_flow",
            taskRoleArn="arn:aws:iam::X:role/Role",
            executionRoleArn="arn:aws:iam::X:role/Role",
            containerDefinitions={
                "name": "my-flow",
                "image": "my-flow",
                "command": [],
                "environment": [],
                "essential": True,
            }
        )
    Fargate Agent:
    echo "export REGION_NAME=${CDK_DEPLOY_REGION}" > .env
    echo "export AWS_DEFAULT_REGION=${CDK_DEPLOY_REGION}" >> .env
    echo "export AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}" >> .env
    echo "export AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}" >> .env
    source .env
    nohup prefect agent start fargate > prefect_fargate.log 2>&1 & disown
    Everything seems fine until I try to run it. There’s no update in the agent logs to show it’s been picked up, and the UI just says:
    Failed to load and execute Flow's environment: AttributeError("'str' object has no attribute 'get'")
    Anything immediate/obvious/stupid stand out? As an aside I’m guessing I’m missing a bunch of config for the agent, but the docs are a little unclear on that one, the top level of the docs [https://docs.prefect.io/orchestration/agents/fargate.html#installation] seem to suggest you only need the handful of vars I added UPDATE: Some progress, it’s now giving a meaningful error,
    Parameter validation failed: Missing required parameter in containerDefinitions[0].logConfiguration: "logDriver
    Fixing it
    j
    j
    • 3
    • 25
  • m

    Marwan Sarieddine

    06/03/2020, 6:33 PM
    Hey everyone _ I was wondering if someone can tell me why the following code runs fine without errors when calling
    flow.run()
    but
    flow.register()
    flags an error mainly the healthcheck throws this error - which I see referenced in the documentation … I am using prefect
    v0.11.4
    ValueError: Mapped tasks with custom result locations must include
    {filename} as a template in their location.
    Please see the code below
    import os
    from prefect import task, Flow
    from prefect.engine.results import S3Result
    
    
    @task
    def gen_list():
        return [x for x in range(10)]
    
    
    @task
    def add(x, y):
        return x + y
    
    
    @task
    def multiply(x, y):
        return x * y
    
    
    result = S3Result(
        bucket=os.environ["AWS_BUCKET"],
        location='prefect-testing/{task_name}/{map_index}.prefect'
    )
    
    with Flow('Test Flow', result=result) as flow:
        x = gen_list()
        y = gen_list()
        added = add.map(x, y)
        multiplied = multiply.map(added, added)
    
    # flow runs fine if running locally
    flow.run()
    
    
    """
    flow fails to register
    ValueError: Mapped tasks with custom result locations must include
    {filename} as a template in their location.
    """
    flow.register('Test Project')
    n
    • 2
    • 7
  • d

    Darragh

    06/03/2020, 8:22 PM
    Hey guys, having trouble with running a flow on FargateTaskEnvironment. Configuration for the Environment is below..
    flow.environment = FargateTaskEnvironment(
            launch_type="FARGATE",
            region="eu-west-1",
            cpu="256",
            memory="512",
            networkConfiguration={
                "awsvpcConfiguration": {
                    "assignPublicIp": "ENABLED",
                    "subnets": ["subnet-X"],
                    "securityGroups": ["sg-Y"],
                }
            },
            family="my_flow",
            taskRoleArn="arn:aws:iam::X:role/CommonSuperRole",
            executionRoleArn="arn:aws:iam::X:role/CommonSuperRole",
            containerDefinitions={
                "name": "my-flow",
                "image": "my-flow",
                "command": [],
                "environment": [],
                "essential": True,
            }
        )
    I keep getting this error:
    An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Fargate requires task definition to have execution role ARN to support log driver awslogs
    So my questions : • I’m using the same Uber Role for both the taskRoleArn and the executionRoleArn, probably not best practice but should work? • I’ve thrown every possible log and cloudwatch related policy/permission at it that I can think of, but nothing is taking. I can provide a dump of the permissions if need be? Any help massively appreciated, fairly stumped on it. Is there any way to get more debug info out of it?
    j
    l
    m
    • 4
    • 32
  • m

    Marwan Sarieddine

    06/03/2020, 9:09 PM
    Hello everyone - I am unable to template a
    target
    or
    Result.location
    when mapped tasks are involved and deploying on a dask cluster - I placed reproducible examples on [github](https://github.com/PrefectHQ/prefect/issues/2716) - would reallyappreciate it if someone can let me know what I am missing
    👀 1
    c
    • 2
    • 2
Powered by Linen
Title
m

Marwan Sarieddine

06/03/2020, 9:09 PM
Hello everyone - I am unable to template a
target
or
Result.location
when mapped tasks are involved and deploying on a dask cluster - I placed reproducible examples on [github](https://github.com/PrefectHQ/prefect/issues/2716) - would reallyappreciate it if someone can let me know what I am missing
👀 1
c

Chris White

06/03/2020, 9:15 PM
I’m taking a look and will respond on the issue - thanks for bringing this to our attention!
m

Marwan Sarieddine

06/03/2020, 9:17 PM
@Chris White - thank you for your prompt response
👍 1
View count: 1