https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • h

    Hui Zheng

    01/21/2021, 1:15 AM
    Hello, a few of our flow runs failures from 4:30 to 4:50 PST time with the same issue of failure in retrieving task
    GCS_result
    . It happened to flow runs on different agents and k8e clusters. I wonder if there is a Prefect-cloud infrastructure issue?
    c
    • 2
    • 4
  • s

    Sagun Garg

    01/21/2021, 3:23 AM
    Request for Help: S3Result returning error No credentials Found while running on AWS EKS Fargate Profile and we are also supplying AWS credentials in config.toml https://docs.prefect.io/api/latest/engine/results.html#s3result
    NoCredentialsError('Unable to locate credentials')Traceback
    Reference Blog: https://makeitnew.io/prefect-a-modern-python-native-data-workflow-engine-7ece02ceb396
    ```
    Unexpected error while reading from result handler: NoCredentialsError('Unable to locate credentials')Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/prefect/engine/results/s3_result.py", line 136, in read Bucket=self.bucket, Key=location, Fileobj=stream File "/usr/local/lib/python3.7/site-packages/boto3/s3/inject.py", line 678, in download_fileobj return future.result() File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 106, in result return self._coordinator.result() File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 265, in result raise self._exception
  • l

    Loc Nguyen

    01/21/2021, 3:26 AM
    Hii, i am having some issues with trying Prefect on Docker, when i pull the image down and run, it dies immediately, here is my docker command:
    sudo docker run -d prefecthq/prefect:latest
    Checking docker logs, i have this:
    tini (tini version 0.18.0)
    Usage: tini [OPTIONS] PROGRAM -- [ARGS] | --version
    
    Execute a program under the supervision of a valid init process (tini)
    
    Command line options:
    
      --version: Show version and exit.
      -h: Show this help message and exit.
      -s: Register as a process subreaper (requires Linux >= 3.4).
      -p SIGNAL: Trigger SIGNAL when parent dies, e.g. "-p SIGKILL".
      -v: Generate more verbose output. Repeat up to 3 times.
      -w: Print a warning when processes are getting reaped.
      -g: Send signals to the child's process group.
      -e EXIT_CODE: Remap EXIT_CODE (from 0 to 255) to 0.
      -l: Show license and exit.
    
    Environment variables:
    
      TINI_SUBREAPER: Register as a process subreaper (requires Linux >= 3.4).
      TINI_VERBOSITY: Set the verbosity level (default: 1).
      TINI_KILL_PROCESS_GROUP: Send signals to the child's process group.
    It is quite unclear what is the issue 😞
    c
    • 2
    • 2
  • e

    eamonn faherty

    01/21/2021, 11:11 AM
    hi all. I have been using spotify's luigi to build workflows with around 15,000 tasks. I am hitting high CPU and memory usage in luigi and am looking around for alternatives. Would prefect work at the scale of 15k tasks?
    c
    • 2
    • 1
  • v

    Vitaly Shulgin

    01/21/2021, 3:07 PM
    Hello Team, I sub-classed PostrgresTask, to provide db connection details, to provide values from configuration, and got problem with argument
    fetch
    when prefect calls to run method
    👀 1
    ✅ 1
    m
    • 2
    • 9
  • d

    Dilip Thiagarajan

    01/21/2021, 3:48 PM
    hi team, is there anyway I can pipe SLURM logs to a Prefect server?
    m
    • 2
    • 1
  • r

    Ryan Kelly

    01/21/2021, 4:49 PM
    Hi all, we’ve done a POC of Prefect and love what the tool has to offer. Been able to get flows running and tied to Prefect Cloud but am struggling to understand scaling and deployment workflow from the documentation and articles online. Would love to write one up and post it on Medium once figured out! My questions are: • Is this structure an expected structure? • How are the new jobs supposed to be registered through CI/CD? • How are dependencies across job files managed? For example, if theres multiple fact tables dependent on d_sample, how do I set up the fact job files? • Does the agent have to be restarted every time a new file flow is registered?
    m
    b
    d
    • 4
    • 11
  • a

    Alex Rud

    01/21/2021, 5:00 PM
    Hello… Is there a playbook to implement authentication for Prefect Server similar to the experience you get in the cloud? I’m dealing with PHI data and I can’t have an open door to the console for everyone at my company
    🙌 1
    m
    j
    • 3
    • 9
  • p

    Pedro Machado

    01/21/2021, 5:44 PM
    Hi there. Does Prefect Cloud support IP whitelisting at the tenant level? For example, make sure that our token can only be used from certain IPs?
    m
    • 2
    • 1
  • l

    Lukasz Mentel

    01/21/2021, 7:13 PM
    👋 Hi everyone!
    👋 3
  • a

    alex

    01/21/2021, 7:20 PM
    Hello! I'm looking to get a better understanding of how exactly a flow runs. I ran into this issue locally where I deployed 2 flows by calling
    demo('flow1')
    and
    demo('flow2').
    I then changed the signature of DemoClass to take in another param, and reflected that change ie.
    tc = DemoClass(source_name, "new_param")
    and ran
    demo('flow1')
    again. This caused flow2 to start failing due it missing the new param, even though the flow was not directly modified. I'm wondering what exactly happened here? If I'm using a venv, is that venv's python being used? If I
    pip install --upgrade
    a package with breaking changes, would all my flows immediately start failing? Or if I deployed a flow after the upgrade?
    class DemoClass:
        def __init__(self, source_name):
            self.source_name = source_name
            # self.new_param = new_param (added later in signature)
            pass
    
        def run(self):
            return self.source_name
    from myprefect.demos.demo_class import DemoClass
    from prefect import Flow, task
    from prefect.schedules import IntervalSchedule
    
    @task(log_stdout=True)
    def initialize_and_run_class(source_name):
        tc = DemoClass(source_name)
        r = tc.run()
        print(r)
        return r
    
    
    def demo(data_source):
        with Flow(
            f"flow - {data_source}",
            schedule=IntervalSchedule(interval=datetime.timedelta(seconds=60)),
        ) as f:
            tsk = initialize_and_run_class(data_source)
    
        f.register(project_name="demo")
    m
    • 2
    • 3
  • l

    Lukasz Mentel

    01/21/2021, 7:22 PM
    I've been using prefect for some time and I really enjoy the experience. There was a bit of a learning curve but once I got the concepts the progress is steady. I started with the functional API but I realized I need a bit more flexibility to dynamically create the flows. I'm currently trying to find the imperative equivalent of the following code:
    with Flow("my-flow") as flow:
        data_batch = fetch_dataframe(path)
    
        inputs = fetch_series(data_batch, "series-name")
        result = compute_vfr(inputs)
    
        power = fetch_series(data_batch, "power")
        cop = compute_cop(result, power)
  • l

    Lukasz Mentel

    01/21/2021, 7:25 PM
    The main challenge is the task
    compute_cop
    which has two dependencies, and when I try to replicate that with
    Flow.add_edge()
    I can only specify a single
    key
    to pass the data between tasks. Does anyone know how to get solve or get around this?
    a
    m
    • 3
    • 5
  • m

    Marco Palmeri

    01/21/2021, 8:50 PM
    Hi Prefect folks. Quick question. Does the prefect.context.today variable respect the timezone as specified when instantiating a Schedule?
    m
    j
    • 3
    • 8
  • j

    jack

    01/21/2021, 9:39 PM
    How does someone create an account for this slack workspace? Clicking "create account" from prefect-community.slack.com says you need an
    @prefect.io
    email address, or it says to contact an admin.
    a
    • 2
    • 1
  • b

    Boris Gaganelov

    01/21/2021, 9:44 PM
    Hey guys, stuck on something rather weird so I'm trying to figure out how to resolve it hopefully someone here might be able to help. After migrating a flow to use
    GitLab
    as a Storage - things seem to be working fine except for flows which actually import things from a GitLab subdirecroty. So I keep getting
    ModuleNotFoundError("No module named 'tasks'")
    . My Storage repository file structure looks a bit like this:
    root-of-repo/
    ├── flow_hello.py
    └── tasks/
        ├── __init__.py
        └── hello.py
    And the in-code flow config is looking like so:
    flow.storage = GitLab(
        repo="XXXXX",
        host="XXXXX",
        path="flows/flow_hello.py",
        secrets=["GITLAB_ACCESS_TOKEN"],
        ref="master",
    )
    But it seems to be failing to resolve the import so I'm a little bit confused. Am I somehow able to add it to the path of the agent despite it being a GitLab storage?
    m
    • 2
    • 4
  • m

    Marwan Sarieddine

    01/21/2021, 11:08 PM
    Hi folks, I have a very general question/clarification concerning the importing and handling of the prefect
    context
    object. Is there a difference between importing the
    context
    globally vs from within a task ? i.e.
    import prefect
    
    @task
    def t():
        prefect.context.get("...")
    vs
    @task
    def t():
        import prefect
        prefect.context.get("...")
    Also in a good portion of the docs the
    context
    is used as
    prefect.context
    - is that purely for readability reasons or is there a side effect for using
    from prefect import context
    that one should be aware of ? (My current intuition/understanding is that there shouldn't be any difference here)
    m
    a
    • 3
    • 7
  • m

    Mark McDonald

    01/21/2021, 11:53 PM
    Hi - I'm currently upgrading to version 0.14, using ECSRun config and s3 storage. I used to use docker storage. With docker storage, I was able to define where my flows were located and executed from within my image. It seems like with S3 storage, I lose control over where the flow files are executed from because you all take care of downloading them into the image. From what I can tell, with s3 storage, the flows are being executed from inside of "tmp/" (example: /tmp/prefect-b0r890j3). Is my understanding of 'tmp' correct? Is there a way to override this location and have you download the flow files elsewhere? When I develop locally, at the root of my project, I have a directory called "src", where I store my flow files. Within "src" I also have a sub-directory called "helpers". Inside of "helpers" I store non-flow definition supporting code. If all my code were located in a single flow file, I wouldn't be concerned with where the flow is being executed from. However, because I'm working with helper code/files (like the example below), it's a challenge to not be able to control where the flow is executed from. Any advice on this? path = os.path.join(os.getcwd(), "helpers/query_info.yaml") with open(path, "r") as stream: data_loaded = yaml.safe_load(stream)
    m
    • 2
    • 11
  • j

    Jeremiah

    01/22/2021, 1:05 AM
    Hey everybody! We’re hosting a happy hour to celebrate Prefect’s birthday and we’d love you to join us. Please feel free to pop in here, hope to see you! https://spatial.chat/s/prefect-birthday-party
    🎂 2
    :upvote: 2
  • j

    Josh

    01/22/2021, 5:05 AM
    Has anyone been able to get Github Actions to build and deploy flows? I’ve been struggling with it for a while and just can’t seem to get the images pushed. I’m using github actions with google cloud registry.
    • 1
    • 2
  • a

    Anze Kravanja

    01/22/2021, 6:06 AM
    Hello! I’m pretty new to the tool but love it so far! I have a question, if I have a flow, the flow is registered and has parameters, can I schedule 2 flow runs at two different schedules with different parameters? I know I can easily do a one time run in the UI with diff parameters.
    m
    n
    +2
    • 5
    • 8
  • m

    Matthew Blau

    01/22/2021, 2:58 PM
    Hello all, I am in the process of migrating over code to work with Prefect. All of our existing code is in docker containers. My assumption is that I would need a docker-agent to run the code once I rebuild the containers to have prefect inside of it. Is this correct? We are looking for the ability to restart different parts of the ETL task upon failure like prefect provides. Am I correct? Thank you all in advance!
    m
    • 2
    • 11
  • c

    ciaran

    01/22/2021, 4:58 PM
    Turns out this was caused by me having `,`s at the end of each line, if someone finds parameters suddenly getting converted to tuples!
    with Flow("This is a test") as flow:
        path = Parameter(name="path", default="blah"),
        variable = Parameter(name="variable", default="blah"),
        source_crs = Parameter(name="source_crs", default="epsg:4326")
        dataset = open__file(path)
    sigh it's Friday.
    👀 1
    n
    • 2
    • 3
  • c

    ciaran

    01/22/2021, 5:14 PM
    I should point out
    Tuple
    is not what I want
    s
    • 2
    • 4
  • m

    Matt Gordon

    01/22/2021, 6:46 PM
    Hi Pre-friends. I wonder if I could ask for some help. I am working on a project using Prefect Core, and am trying to create a Flow-of-Flows. The documentation says of
    StartFlowRun
    :
    Task used to kick off a flow run using Prefect Core’s server or Prefect Cloud. If multiple versions of the flow are found, this task will kick off the most recent unarchived version.
    (Emphasis mine.) The problem I’m having is this: if I instantiate StartFlowRun with a
    project_name
    , I get:
    Malformed response received from Cloud - please ensure that you have an API token properly configured.
    If I instantiate it without specifying a
    project_name
    , at runtime I see:
    raise ValueError("Must provide a project name.")
    raised from
    StartFlowRun
    n
    • 2
    • 10
  • m

    Matt Gordon

    01/22/2021, 6:47 PM
    I will try to pull together a small repro if that will help, but I’m looking for guidance about this because the API docs specifically say
    project_name
    is optional.
  • m

    Matt Gordon

    01/22/2021, 6:49 PM
    from prefect import Flow, Task, task, Parameter
    from prefect.tasks.prefect import StartFlowRun
    
    @task
    def first(k):
        return {k: 10}
    
    @task
    def second(foo):
        foo['quux'] = 20
        foo['foo'] += 3
        return foo
    
    with Flow('f1') as f1:
        p = Parameter('k', default='foo')
        x = first(p)
    
    with Flow('f2') as f2:
        foo = Parameter('foo', default={'foo': 20})
        y = second(foo)
    
    
    ft1 = StartFlowRun(flow_name='f1',wait=True) # project_name here or not?
    ft2 = StartFlowRun(flow_name='f2', wait=True)
    with Flow('f3') as f3:
        _r = ft1()
    r = f3.run()
  • m

    Matt Gordon

    01/22/2021, 6:49 PM
    This is the scratch code I’m working with
  • m

    Matt Gordon

    01/22/2021, 6:49 PM
    Please let me know if there’s a better place to post this, thanks in advance for your help.
  • j

    James Phoenix

    01/22/2021, 7:23 PM
    from prefect import Task
    
    
    def learning(task, old_state, new_state):
        if isinstance(new_state, state.Success):
            print("Yay")
        return new_state
    
    class MyTask(Task):
        def run(self):
            return "hello"
    
    task_1 = MyTask()
    flow = Flow(name="my_flow", tasks=[task_1], state_handlers=[learning])
    state = flow.run()
    m
    • 2
    • 4
Powered by Linen
Title
j

James Phoenix

01/22/2021, 7:23 PM
from prefect import Task


def learning(task, old_state, new_state):
    if isinstance(new_state, state.Success):
        print("Yay")
    return new_state

class MyTask(Task):
    def run(self):
        return "hello"

task_1 = MyTask()
flow = Flow(name="my_flow", tasks=[task_1], state_handlers=[learning])
state = flow.run()
m

Michael Adkins

01/22/2021, 7:31 PM
Hi James! With the addition of
from prefect.engine import state
and
from prefect import Flow
this runs as expected
[2021-01-22 13:30:33] INFO - prefect.FlowRunner | Beginning Flow run for 'my_flow'
/Users/michaeladkins/prefect/core/src/prefect/engine/flow_runner.py:235: UserWarning: prefect.engine.executors.LocalExecutor has been moved to `prefect.executors.LocalExecutor`, please update your imports
  executor = prefect.engine.get_default_executor_class()()
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Flow 'my_flow': Handling state change from Scheduled to Running
[2021-01-22 13:30:33] INFO - prefect.TaskRunner | Task 'MyTask': Starting task run...
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Handling state change from Pending to Running
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Calling task.run() method...
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Handling state change from Running to Success
[2021-01-22 13:30:33] INFO - prefect.TaskRunner | Task 'MyTask': Finished task run for task with final state: 'Success'
[2021-01-22 13:30:33] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Flow 'my_flow': Handling state change from Running to Success
Yay
Although note you are overriding your global
state
variable which should contain the Prefect states with the result of your flow
j

James Phoenix

01/23/2021, 1:31 PM
Prefect, thanks @Michael Adkins
View count: 1