https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • k

    Kevin

    05/26/2021, 4:07 PM
    Hi, question regarding this guide: https://medium.com/the-prefect-blog/seamless-move-from-local-to-aws-kubernetes-cluster-with-prefect-f263a4573c56 . If I want to have a prefect agent running in my EKS cluster, do I need to allow any inbound traffic, or is it enough to just allow the cluster to access the public internet? Also, is there a more restrictive rule I can have on the outbound end so I don't need to allow all traffic?
    k
    • 2
    • 3
  • p

    Pedro Henrique

    05/26/2021, 7:43 PM
    Hello, every time I start the flow, the prefect does not find the path where my scripts are and the same error is displayed Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'download'")
    k
    • 2
    • 1
  • c

    Claire Herdeman

    05/26/2021, 9:02 PM
    Hi there, I'm new to Prefect and checking out options to see if Prefect will work for my company. I'm having trouble starting an ECS Agent within our infrastructure that communicates with PrefectCloud. We are using HIPAA compliant AWS architecture as described here, i.e. most of the infrastructure is in private VPCs and exposed via a public VPC. Can you help me figure out if that's my issue?
    k
    • 2
    • 21
  • b

    Bruno Murino

    05/26/2021, 10:48 PM
    Hi everyone — on one of the first tasks in a flow I create an aws session object that is then passed as argument to many subsequent tasks. Is there any way to set this aws session globally for all tasks, so that I don’t have to explicitly pass it? I tried adding it to prefect.context but that doesn’t work..
    k
    • 2
    • 3
  • n

    nick vazquez

    05/27/2021, 1:12 AM
    I was curious what was the limit for the number of mapped tasks? (not sure if I have the terms correct) I want to run some adhoc flows that potentially kick off anywhere between 100-100,000 tasks (the same task but parameterized)
    k
    • 2
    • 7
  • f

    Fabrice Toussaint

    05/27/2021, 9:51 AM
    Hi everyone! Is there a way to get the total amount of flow runs with the GraphQL API? When I want to retrieve the results there is a limit of 100 but I have more flow runs than that and want to loop over them. My goal is to delete all flows that are not scheduled.
    m
    • 2
    • 3
  • j

    Jacob Baruch

    05/27/2021, 12:30 PM
    Hi #prefect-community I'm new to Prefect, if I'm using prefect Cloud, does it means that that all infrastructure is managed by Prefect, or I got it wrong?
    g
    • 2
    • 1
  • c

    Chohang Ng

    05/27/2021, 4:55 PM
    I am having trouble using prefect.io as flows can run successfully but nothing really happened. Here are what I mean by that. 1. I registered to cloud successfully with api generated from service account and api. 2. The upload to database happened when I register the flows. 3. But when it is scheduled to run, the upload to the database didn't happen despite the fact that the UI shows success on all the flows. Any pointer out there? Appreciate all your help!
    k
    • 2
    • 46
  • n

    Nikola Lusic

    05/27/2021, 5:17 PM
    Hey guys, so I'm setting up the Prefect environment that will be running tasks on AWS ECS via ECSRun configuration. Everything is working except for one parameter: setting the
    cpu
    and
    memory
    parameters of the ECSRun doesn't seem to have any effect. It seems that the agent always creates a task with 1CPU and 2GB RAM. Do you maybe have some pointers for this issue?
    k
    m
    m
    • 4
    • 32
  • i

    Irvin Tang

    05/27/2021, 5:28 PM
    Hi there, I’m currently testing a function that registers a flow to a specified environment like so (got inspiration from this: https://github.com/PrefectHQ/prefect/blob/master/tests/cli/test_get.py)
    def test_register_flow_docker(self):
            register_flow(
                flow=flow_docker,
                flow_name="test",
                project_name="test-project",
                labels=["test-label"],
                host_type="docker"
            )
    
            config = get_config("docker")
            with set_temporary_config(config):
                runner = CliRunner()
                result = runner.invoke(get, ["flows"])
                assert result.exit_code == 0
                assert (
                    "test-docker" in result.output
                    and "test-project" in result.output
                )
    I was wondering if there was another way for testing flow registration? I want to be able to register the flow and the test should confirm that the flow successfully reigstered
    k
    • 2
    • 10
  • f

    Felipe Saldana

    05/27/2021, 5:40 PM
    Hey Prefect. I am trying to wrap my head around whats happening when using .bind() vs not using it.
    post_runner.set_upstream(all_pushes)
    post_runner.bind(mapped_run_name, mapped_data_root, mapped_collection_name)
    post_runner(mapped_run_name, mapped_data_root, mapped_collection_name)
    Using bind() works as I would expect: all_pushes completes entirely and then post_runner starts and completes. Not using bind() post_runner does not wait for all_pushes to complete and then mayhem ensues and I get run() missing X required positional arguments messages. Thoughts?
    k
    • 2
    • 9
  • k

    Kathryn Klarich

    05/27/2021, 6:19 PM
    Hi all, I have a flow in which the first task grabs a set of files (where the number of files can vary) , and then the next task maps over the file locations returned by the first task to submit a batch job for each file. However, i have multiple batch jobs that should be submitted consecutively per file, and i want the first batch job to complete for each file before the second job starts. However, I don't want to wait for all the batch jobs from each round to be completed before submitting the next round of batch jobs (i.e. if the first batch job completes for file_A, i want to go ahead and submit the next batch job for file_A. Is there a way of doing this inside the prefect flow? i could set this up outside of the prefect flow, but then i think i would have to re-register a new flow every time a new set of files is uploaded to s3. Thanks in advance!
    k
    • 2
    • 4
  • a

    Andrew Hannigan

    05/27/2021, 7:58 PM
    Can timeouts be specified at the flow-level? Occasionally I’ve had issues with the dask cloudprovider process instantiating my Dask cluster on ECS, where it hangs indefinitely, preventing any tasks from actually starting. In this scenario the task timeout doesn’t help because it never actually gets to the point of running a task, it just hangs during the setup process. So it’d be helpful to be able to specify timeout at the overall flow level, if that is currently possible?
    k
    s
    m
    • 4
    • 8
  • p

    Pietro Immordino

    05/28/2021, 4:46 AM
    💃 Just arrived!
  • c

    Christian

    05/28/2021, 7:11 AM
    Hi all 👋 Quick question: Are there any examples around that illustrate the use of a docker container as a task (CreateContainer, RunContainer, …) - similar to a ShellTask? I plan to execute a “docker run” task inside of one of my flows… Ideally, this docker container should mount a volume that provides input and receives the container runs’ output and be parsed by the following task in the flow… Alternatively I could stream the models’ output to stdout and parse the logs of the container for extracting the output information… My use case is that I have a container with a scientific model that should produce some output every day… Cheers, C
    k
    • 2
    • 1
  • t

    Thomas Hoeck

    05/28/2021, 7:42 AM
    Hi, I sometimes experience where a flow has run, failed and then produces and error log, but I can't access it. It was only 10 hours after the flow ran. Has anyone experienced something similar? I have tried it a couple of times now.
    k
    • 2
    • 9
  • j

    John Ramirez

    05/28/2021, 1:17 PM
    Hey everyone! Is there an easy way to delay the execution of a task. Im sending POST calls with a task but then I need to wait 30 sec because parsing the logs for data validation. is there an easy way to do this
    k
    • 2
    • 14
  • n

    Nikola Lusic

    05/28/2021, 2:18 PM
    Hey guys, a quick question: can flows defined by the
    ECSRun
    configuration be ran in a threaded mode, similar how
    LocalDaskExecutor
    has the
    scheduled=threads
    configuration?
    k
    • 2
    • 6
  • c

    ciaran

    05/28/2021, 2:23 PM
    Hey folks 👋 Does anyone have any recommendations/good practices for debugging/monitoring Prefect & K8s? I'm in a world of hurt at the mo where if a Prefect Job fails, I lose the pods and then I cannot access the logs. On AKS I can use Log Workspaces, but even then it's an awful experience as you have to try and query via timeranges and labels to identify the containers that just ran, then get the logs that way. On something like ECS and non-k8s, I can see all my terminated tasks and immediately click through to CloudWatch
    👀 1
    f
    k
    t
    • 4
    • 22
  • c

    ciaran

    05/28/2021, 2:32 PM
    Do people just end up running an agent and then ELK or something alongside it?
  • c

    Charles Leung

    05/28/2021, 3:01 PM
    Hey Prefect team! we have created alot of actions under automations (various pager duty instances to different environments). Now we want to clean some up; how do we remove an action?
    k
    j
    • 3
    • 11
  • s

    Sean Harkins

    05/28/2021, 4:25 PM
    I’m seeing linear memory increase of a flow (until the container where my task is running is killed with an
    OutOfMemoryError
    .
    m
    • 2
    • 77
  • j

    Jacob Goldberg

    05/28/2021, 5:07 PM
    Hi everyone! I am working on setting up my first flow on AWS infrastructure. Using Docker storage and pushing to AWS ECR, and running on ECS + Fargate. When I try to build and register my container I get this error:
    [2021-05-28 09:59:27-0700] INFO - prefect.Docker | Pushing image to the registry...
    Traceback (most recent call last):
      File "flow_registry.py", line 44, in <module>
        build_and_register_all_flows(all_flows)
      File "flow_registry.py", line 35, in build_and_register_all_flows
        storage = storage.build()
      File "/XXX/XXX/XXX/XXX/XXX/XXX/XXX/new_etl/venv/lib/python3.8/site-packages/prefect/storage/docker.py", line 303, in build
        self._build_image(push=push)
      File "/XXX/XXX/XXX/XXX/XXX/XXX/XXX/new_etl/venv/lib/python3.8/site-packages/prefect/storage/docker.py", line 378, in _build_image
        self.push_image(full_name, self.image_tag)
      File "/XXX/XXX/XXX/XXX/XXX/XXX/XXX/new_etl/venv/lib/python3.8/site-packages/prefect/storage/docker.py", line 586, in push_image
        raise InterruptedError(line.get("error"))
    InterruptedError: denied: Your authorization token has expired. Reauthenticate and try again.
    This is confusing to me, because I have double checked that my local AWS authentication is all setup properly in the environment. Is this referring to something else? Relatively new to Docker, ECR, and Prefect. Any help appreciated! Any ideas?
    k
    • 2
    • 30
  • t

    Tom Baldwin

    05/28/2021, 8:26 PM
    I enjoyed @Kevin Kho’s talk at MS build. In the demo, ADF triggers a flow run using a Databricks notebook job to hit the Prefect GraphQL API, using python's
    requests
    library. Is there a way to trigger a flow run directly from ADF (without databricks as an intermediary)?
    :upvote: 1
    k
    • 2
    • 8
  • i

    itay livni

    05/29/2021, 12:56 AM
    Hi - Is there a way to mix serializers? For example the default flow result serialization is set to pandas. But there are some tasks in the flow that I would like to pickle.
    flow.result = LocalResult(
            serializer=PandasSerializer(file_type="csv")
        )
    Task to pickle
    num_clusters = value_count(df)
    Thanks in advance
    m
    b
    • 3
    • 5
  • d

    Daniel Bast

    05/29/2021, 6:38 AM
    Hi, did anyone here wrote his own small client code to only 1. get a tenant 2. query for a flow ID 3. trigger the flow with parameter and depending only on python stdlib + urllib3? That would be quite handy for running via low latency on AWS lambda without the need to include the heavy prefect package with all its dependencies. Thanks!
    k
    • 2
    • 4
  • a

    Aurélien Vallée

    05/30/2021, 7:07 AM
    I'm trying to do something like the following, but it does not seems to work (the "multiprocess" parameter is not shown as a parameter in prefect UI:
    with Flow("myflow") as flow:
        multiprocess = Parameter("multiprocess")
        with case(multiprocess, True):
            flow.executor = DaskExecutor()
    k
    • 2
    • 1
  • p

    PKay

    05/30/2021, 9:57 PM
    Having a weird error when running flow from Prefect Cloud. I am able to run the flow locally and register to cloud but when I trigger a run from the cloud an error stops one of the tasks to finish correctly.
    Unexpected error: TypeError('no default __reduce__ due to non-trivial __cinit__')
    Traceback (most recent call last):
      File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 900, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs)
      File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/results/local_result.py", line 116, in write
        value = self.serializer.serialize(new.value)
      File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize
        return cloudpickle.dumps(value)
      File "/home/USERNAME/.local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/home/USERNAME/.local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
        return Pickler.dump(self, obj)
      File "stringsource", line 2, in pymssql._mssql.MSSQLConnection.__reduce_cython__
    TypeError: no default __reduce__ due to non-trivial __cinit__
    Tried both running on Ubuntu 20.04.02 and Windows 10, both are able to run flow locally, even tried using dockers and got the same error. I am using python package pymssql to extract data from database and do some basic transformations with petl. Works fine when I run the flow without any agent/cloud. It's able to extract data and do transformations. Figure it's something to do with how it's passing data between functions.
    k
    • 2
    • 6
  • m

    matta

    05/30/2021, 10:02 PM
    Looking to turn a scraper into a Prefect flow - anyone got a Dockerfile handy that configures Prefect + Selenium and forces them to play nicely? (no specific issues yet, I just know Selenium is fussy as hell)
    k
    • 2
    • 3
  • f

    Fabrice Toussaint

    05/31/2021, 8:46 AM
    Hi everyone, How do I use the input of a mapped task as input for another mapped task? I have two apply map functions where the first one returns a list of lists, then in the second mapping function I want to do a mapping for each element of this list. I flattened the list but for some reason instead of using the elements, it uses the lists separately.
    k
    • 2
    • 17
Powered by Linen
Title
f

Fabrice Toussaint

05/31/2021, 8:46 AM
Hi everyone, How do I use the input of a mapped task as input for another mapped task? I have two apply map functions where the first one returns a list of lists, then in the second mapping function I want to do a mapping for each element of this list. I flattened the list but for some reason instead of using the elements, it uses the lists separately.
k

Kevin Kho

05/31/2021, 3:29 PM
Hi @Fabrice Toussaint, it should work. Could you show me your code?
f

Fabrice Toussaint

05/31/2021, 4:08 PM
I got a "fake" example:
@task
def offset_for_number_of_pages(pages: int = 1):
    if pages < 1:
        raise ValueError('Cannot be lower than 1!')
    else:
        return [100 * n for n in range(pages)]

@task
def get_items(offset, url):
    """Retrieves the orders response of the API request."""
    request = requests.get(url, offset=offset)
    return request.json

@task
def do_something(item):
    return item + 1

def execute_for_item(item):
    return do_something(item)
    
with Flow as flow:
    pages = Parameter('pages', default=1)
    offsets = offset_for_number_of_pages(pages=pages) #[0, 100, 200, 300, 500]
    items = get_items.map(offset=offsets, url=unmapped(url)) #[[0..100], [100..200], [200..300], [300..400]
    results = apply_map(execute_for_item, item=flatten(items)) #[0..400]
in the "execute_for_item" I call multiple tasks but I did not add all of them in this fake example
from prefect import task, Flow, apply_map, flatten, unmapped, Parameter


@task
def generate_list(amount: int = 1):
    if amount < 1:
        raise ValueError('Cannot be lower than 1!')
    else:
        return [100 * n for n in range(amount)]


@task
def get_items(item):
    """Retrieves the orders response of the API request."""
    return [0, 1, 2, 3, item]


@task
def add_one(item):
    print(item)
    return item + 1


@task
def add_two(item):
    return item + 2


def execute_for_item(item):
    item_plus_one = add_one(item)
    return add_two(item_plus_one)


with Flow("Test") as flow:
    amount = Parameter('amount', default=1)
    l = generate_list(amount=amount)  # [0, 100, 200, 300, 500]
    items = get_items.map(item=l)  # [[0..100], [100..200], [200..300], [300..400]
    results = apply_map(execute_for_item, item=flatten(items))  # [0..400]

flow.run()
this example actually works
you see that the apply_map doesnt work for execute_for_item, as add_one prints a list instead of the items from items
it uses the list and not the elements of the list
k

Kevin Kho

05/31/2021, 6:16 PM
I see. This may take me a bit to figure out. I'll respond to you tomorrow since today is a US holiday
f

Fabrice Toussaint

05/31/2021, 6:17 PM
Thank you! Enjoy your day off tomorrow :)
Today I mean!
👍 1
@Kevin Kho any updates 😄? I hope you had a nice Memorial Day yesterday btw!
k

Kevin Kho

06/01/2021, 1:28 PM
Just in looking at it now
I need to do some tests on my end
You’re right that the flatten does not work in apply_map. I don’t know why. Will ask the team but seems like you’ll need an intermediate task to flatten things like this:
from prefect import Flow, task, flatten, case, apply_map
import prefect

@task
def generate_numbers():
    return [[1],[2,3],[4,5,6]]

@task
def up(x):
    return x + 1

@task
def is_even(x):
    return x % 2 ==0

def up_or_down(x):
    cond = is_even(x)
    with case(cond, True):
        res = up(x)
    return res

@task
def simple_flatten(x):
    return [num for elem in x for num in elem]

with Flow('test') as flow:
     x = generate_numbers()
     y = simple_flatten(x)
     apply_map(up_or_down, y)

flow.run()
f

Fabrice Toussaint

06/01/2021, 2:01 PM
that works, shouldve thought about it myself :')
thanks a lot again, a bit weird that flatten doesnt work with apply_map
k

Kevin Kho

06/01/2021, 2:09 PM
no problem! I think it has to do with the fact that apply map works on a function and not a task, but i’m not super sure yet
View count: 1