https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    John Ramirez

    02/17/2020, 5:12 PM
    Hey everyone - is there a timeline to when the prefect will be available on python 3.8
    j
    c
    • 3
    • 2
  • s

    Stephane Boisson

    02/17/2020, 10:13 PM
    What would be the best practice/patterns how to implement a web crawler with Prefect ? Using the LOOP signal to recurse until some kind of depth limit?
    j
    • 2
    • 6
  • s

    Stephane Boisson

    02/17/2020, 10:48 PM
    I noticed that if I perform a map on two list:
    my_tak.map([1, 2, 3], ["A", "B"])
    will execute 2 tasks for
    (1, "A"), (2, "B")
    Is there an easy way to map over the 3x2 combos and having 6 tasks for
    (1, "A"), (2, "A"), (3, "A"), (1, "B"), (2, "B"), (3, "B")
    ?
    c
    i
    j
    • 4
    • 6
  • g

    Greg Johnson

    02/18/2020, 2:06 AM
    Hi everyone, thank you for such an awesome product
  • g

    Greg Johnson

    02/18/2020, 2:09 AM
    Is there tutorial somewhere about how to specify "run one task per gpu" or "one task per worker" at a time? This mostly WRT running GPU jobs on a SLURM cluster
    c
    • 2
    • 2
  • a

    Arsenii

    02/18/2020, 6:22 AM
    Hey all, a quick question Let's say I
    map
    a function A over a list L, and then want another function B to continue working on elements of L in a parallel manner after A has finished. Is my understanding correct, that in order to implement this, function A should basically return the same element it got as an input, and pass it to B? i.e. in Python/pseudocode
    def func_A(list_element):
        do_stuff(list_element)
        return list_element
    
    def func_B(list_element):
        list_element+=2 
    
    with Flow(...):
        L = Parameter("list", default=[1,2,3])
        first_step = func_A.map(L)
        second_step = func_B.map(first_step)
    This seems to work, but I wonder whether this is the most efficient way to do this. I also noticed that, if I have several consecutive `map`s set up in the above manner
    A -> B -> C -> D-> ...
    , it will not begin the next step until the previous step has finished executing for all mapped elements. Is there a way to go around this? Is it because I'm using LocalExecutor and not something like DaskExecutor? Thanks!
    a
    • 2
    • 1
  • c

    Chris O'Brien

    02/19/2020, 1:52 AM
    Hi All, Is it possible to have two different flows on different schedules run in the same script? It seems that it isn’t because
    Flow.run()
    is a blocking call?
    c
    • 2
    • 1
  • n

    Nathan Molby

    02/19/2020, 3:40 PM
    Hi all, the following code does not work:
    from prefect import Flow, task
    
    @task
    def createX():
        return []
    
    @task
    def alterX(x):
        return x.append(5)
    
    @task
    def getZ(x):
        return len(x)
    
    with Flow("Main Flow") as flow:
        x = createX()
    
        alterX(x)
    
        z = getZ(x, upstream_tasks=[alterX])
    
    state = flow.run()
    It should create x first, and then alter X, and then get Z with the altered x. Instead, it tries to alter x before x has been created. I could add the upstream task x to alterX, but I thought it should do that automatically because it is a data dependency.
    a
    m
    n
    • 4
    • 5
  • m

    Mark Koob

    02/19/2020, 3:54 PM
    Hi folks! We use Prefect core, and I recently updated Prefect from 0.7.1 to 0.9.2. This caused some issues with a task which used a model type which can't be serialized after training:
    @task
    def get_fit_model(model, x, y):
        model.fit(x, y)
        return model.fit_model
    I eventually realized that this was because the model.fit() operation mutates the model object, and later prefect tries to serialize the mutated object
    model
    , even though only a part of it was used downstream. I was able to get around this by making a deepcopy() of the untrained model, on which the training was performed. I imagine this is due to the "greedy serialization" change Chris White mentioned a month or so back. I suppose the lesson here is that all operands must be serializable at all times. I'm concerned that perhaps I would get better results if I was using result handlers. I'm also curious if this would have been easier to debug if I were running my flow in Prefect Cloud.
    c
    • 2
    • 7
  • a

    Amit Singh

    02/19/2020, 7:35 PM
    @here Can anyone help me with this error. thanks
    Failed: DID NOT RAISE <class 'prefect.engine.signals.SUCCESS'>
  • a

    Amit Singh

    02/19/2020, 7:37 PM
    ^^ my code is quite simple, and it is giving the above error on line - print(result)
    with raises(signals.SUCCESS) as excinfo:
    
    	result = some_lib.some_function()
    	print(result)
    
    assert excinfo.type == signals.SUCCESS
    assert excinfo.typename == 'SUCCESS'
    c
    • 2
    • 6
  • c

    Cab Maddux

    02/20/2020, 1:15 PM
    Hi! quick question: If I'm trying to pass a secret to the credentials property of the GCSUpload task, this is how I should do it right?
    my_secret = Secret('MY_SECRET') # Where the JSON credential for a GCP service account has been copied to a Secret named 'MY_SECRET' via the cloud UI
    
    upload_task = GCSUpload(project='foo', bucket='bar', ...)
    upload_task(data='This is my data', blob='path/to/blob.txt', credentials=my_secret)
  • c

    Cab Maddux

    02/20/2020, 1:16 PM
    I'm having trouble with upload when running via Kubernetes agent.executor and using the cloud secret, but using the same credentials via GOOGLE_APPLICATION_CREDENTIALS env variable when running locally works fine
    j
    • 2
    • 1
  • t

    trapped

    02/20/2020, 3:12 PM
    hi guys, any way to speed up (i.e. avoid serializing the whole result and sending it back to Prefect) task mapping? I've got a ~1M item list/tuple computed with Dask Bags which I'd like to map over, and between completing computation of the bag itself and the first mapped task I'm observing a good deal of time spent waiting for the complete result list
    z
    j
    • 3
    • 10
  • r

    RyanB

    02/20/2020, 7:28 PM
    I am getting Event loop was unresponsive in Scheduler for 18.15s. followed by distributed.scheduler - ERROR - Couldn’t gather keys for a large data set this causes a loop in the prefect flow as it reschedules the tasks it couldn’t gather and eventually hits the same error. Any ideas on what to look for of any configuration options? I am using dask-kubernetes 0.10.1 and dask 2.11.0 in AWS.
  • l

    Luke Orland

    02/21/2020, 4:24 PM
    I have some questions about
    FargateTaskEnvironment
    . 1. Can it just run an existing ECS task definition, or does it always register a new task definition based on the specified family/taskDefinition? 2. If it can just run an existing ECS task definition, what would the minimal necessary arguments be, something like
    FargateTaskEnvironment(taskDefinition='my-ecr-task-definition')
    , assuming AWS credentials are accessible by boto3 via the environment? 3. If it always generates/registers a new task definition, what would the minimal set of required arguments? I could probably go through the process of figuring that out by running
    boto3.client('ecs').run_task(*args)
    until I get the desired task definition running and then translate that into
    FargateTaskEnvironment
    arguments. Curious if someone has gone through this process already.
    b
    j
    • 3
    • 12
  • p

    Pete Fein

    02/22/2020, 3:42 PM
    How do you unit test flows? I didn't see anything about it in the documentation...
    j
    c
    s
    • 4
    • 3
  • n

    Nate Joselson

    02/24/2020, 4:29 PM
    Hi! I have some flows that run every hour according to a cron job. My question is whether it is possible to have one of the flows repeat again after 30 minutes (so essentially running that particular flow twice in the hour). I understand that I could use the scheduler in prefect, or just create a separate cron job that runs half-hourly, but I was wondering if there was a third option as well in prefect. Thanks!
    j
    • 2
    • 4
  • i

    itay livni

    02/24/2020, 7:07 PM
    Hi - I have
    task
    in a
    Flow
    that takes a
    list
    . How do I set rhe
    List
    Trigger
    to run on
    any_successful
    upstream tasks? https://docs.prefect.io/core/concepts/tasks.html#collections I tried a couple approaches including def_list = List(trigger=any_successful) and then calling
    munged_defs_df = definitions.munge_dfs(def_list([df1, df2, df3])
    a
    • 2
    • 6
  • j

    Justin Shimkovitz

    02/24/2020, 7:27 PM
    Hi, I am new to prefect and just finished reading through the documentation. Question about
    map
    responses. When I send a list of elements to a task and one element sent to the task finishes and returns a response, is it possible to move that response to the next task before the rest of the original elements finish in the first task? Thanks in advance!
    c
    • 2
    • 2
  • j

    Jeff Brainerd

    02/24/2020, 10:21 PM
    Random PSA: our team ran into issues installing
    psutil
    (a dependency of
    distributed
    ) on Catalina (OSX 10.15). This thread helped us get past it: https://github.com/giampaolo/psutil/issues/1632
    :upvote: 3
  • j

    Jeremiah

    02/25/2020, 5:46 PM
    has renamed the channel from "prefect-core" to "prefect-community"
    :marvin: 2
    👍 7
  • l

    Luke Orland

    02/25/2020, 9:56 PM
    I'm a little confused about the
    upstream_tasks
    arg on
    Task.map
    . "a list of upstream dependencies to map over". What would mapping over the upstream dependencies do?
    👀 2
    n
    • 2
    • 4
  • a

    Andrew Schechtman-Rook

    02/26/2020, 2:30 AM
    Hi folks! I've been poking around a bunch with Prefect for the last couple months, and over that time I've written a few utilities/extensions to help Prefect work better for my particular data and modeling workflows. In case anyone else may be able to benefit I've pulled them together into a small package — https://pypi.org/project/prefect-ds/. Please feel free to take a look 🙂
    👊 5
    👏 5
    :marvin: 4
    ❤️ 7
    🚀 7
    j
    c
    • 3
    • 19
  • j

    John Ramirez

    02/26/2020, 6:40 PM
    if i want to improve performance through server selection, is memory or compute optimized better for dask?
    j
    • 2
    • 2
  • n

    Nico Aiello

    02/26/2020, 9:14 PM
    Hey everyone, I'm new to the prefect community and was hoping to get advice on how to use prefect for my particular use case. Suppose we first have some batcher task that takes a set of items and batches them into groups. We then need a task to run on each batch. (So far, this can be handled using prefect's mapping). Next, we then want to run a task for each item in the batch and we'd like to be able to run this as soon as each batch finishes, rather than waiting for all batches to finish. To visualize a simpler version:
  • n

    Nico Aiello

    02/26/2020, 9:17 PM
    . --------> item a1 ------------> batch a --------> item a2 batcher --------> item a3 ------------> batch b --------> item b1 --------> item b2 Basically, I'd like prefect to be aware that once the batching happens, we now have two independent, parallelizable flows happening - so if batch a finishes, item a1 can begin even if batch b is still in progress.
    j
    h
    • 3
    • 11
  • j

    John Ramirez

    02/27/2020, 3:04 PM
    Hey everyone, is there a task setting to prevent a mapping task from running until all upstream tasks are complete?
    d
    • 2
    • 18
  • m

    Mark Williams

    02/27/2020, 5:13 PM
    Im trying to determine the best route for reading in a list of text files using Prefect. Would I use the LOOP signal to loop over the list of files or should I build my flow as pertained to one file that way my flow represents a specific file that was processed?
    i
    • 2
    • 8
  • a

    Andor Tóth

    02/28/2020, 2:20 PM
    hi everyone
    👋 6
    i
    j
    • 3
    • 5
Powered by Linen
Title
a

Andor Tóth

02/28/2020, 2:20 PM
hi everyone
👋 6
i

itay livni

02/28/2020, 2:29 PM
@Andor Tóth
from prefect import task
@task
def add_something(a):
   n = a + 1
   return n
n = add_something.run(a)
👍 1
a

Andor Tóth

02/28/2020, 2:31 PM
aha, I wasn't aware that each task has a run method
thanks!
I was looking for something I'm used to with Airflow 🙂
j

Jeremiah

02/28/2020, 2:51 PM
Don’t worry things are easier here :)
View count: 1