• Darragh

    Darragh

    2 years ago
    Has anyone tried using Prefect with an AWS ApplicationLoadBalancer? It keeps failing a healthtest check, regardless of what path I give it. The EC2 instance it’s pointing at is accessible and responds with a 200 to http://IP:8080 but I keep getting a failed test on it. Not saying it’s a Prefect problem, but I’m curious if anyone has hit this?
    Darragh
    s
    +1
    13 replies
    Copy to Clipboard
  • b

    Ben Fu

    2 years ago
    Hello, when moving from 0.12.x to 0.13.x, what is the proper way to migrate the existing flows in the database? I cannot run the Alembic migrations since the old migrations don't exist in the new server repo
    b
    nicholas
    2 replies
    Copy to Clipboard
  • a

    alex

    2 years ago
    Hello, I have a bunch of tasks called sequentially in my flow. If any of them fails, I want to execute a failure_recovery task. What would the most concise way of expressing this be? Also sometimes I may want to execute failure_recovery task directly (this is specified as a param)
    a
    nicholas
    5 replies
    Copy to Clipboard
  • r

    Riley Hun

    2 years ago
    Hello everyone, I just have a quick question (hopefully). But I just took some time to finish reading the prefect core documentation from beginning to end, and there are still a few concepts I'm struggling to understand. The first thing I don't quite get is how
    set_upstream
    and
    set_downstream
    work? Here is an ETL flow I created:
    with Flow("Thinknum ETL") as flow:
    
        token = token_task(
            version=version,
            client_id=client_id,
            client_secret=client_secret
        )
    
        history = history_task(
            dataset_id=dataset_id,
            version=version,
            token=token
        )
    
        loaded_dates = loaded_dates_task(
            bucket_name=bucket_name,
            dataset_id=dataset_id
        )
    
        dates_to_load = get_dates_to_load(
            already_loaded_dates=loaded_dates,
            history=history
        )
    
        datasets_to_load = load_to_gcs_task.map(
            dataset_id=unmapped(dataset_id),
            date=dates_to_load,
            version=unmapped(version),
            token=unmapped(token),
            bucket_name=unmapped(bucket_name)
        )
    It DOES seem to work fine, but I don't know when or if I should be applying set_upstream/set_downstream
    r
    i
    3 replies
    Copy to Clipboard
  • r

    Riley Hun

    2 years ago
    Another quick question, but I've decided to split my prefect tasks into multiple python files to make it a lot more organized and modularized. When I submit the flow to the Dask executor, I get a
    ModuleNotFoundError
    . Do I need to dockerize the flow and specify python paths for this to work? Thanks in advance! My folder tree looks something like this
    ├── alternative_data_pipelines
    │   ├── thinknum
    |.      ├── __init__.py
    │   │   └── thinknum.py
    │   └── utils
    │       ├── __init__.py
    │       ├── logging.py
    │       ├── snowflake.py
    │       └── utils.py
    ├── requirements.txt
    ├── setup.py
    └── thinknum_flow.py
    r
    Dylan
    +2
    18 replies
    Copy to Clipboard
  • Jeremiah

    Jeremiah

    2 years ago
    Sorry for the late notice but Laura is giving a live stream RIGHT NOW all about the new Server and UI:

    https://www.youtube.com/watch?v=89W-jMVH410

    Jeremiah
    Laura Lorenz
    2 replies
    Copy to Clipboard
  • a

    alex

    2 years ago
    Hello everyone, I would really appreciate some advice on how to structure a flow that applies various different tasks to each item in a list. Here is a snippet
    feeds = get_feeds()
    
    with Flow(..):
       for feed in feeds:
         run_feed = GetDataTask(name=f"GetData: {feed.feed_name}")(feed)
         latest = run_feed
    
         if feed.do_transform():
            transform_feed = TransformTask()(feed, upstream_tasks=[latest]))
            latest = transform_feed
         
         # ... some more optional logic here ...
    
         failure_recovery = FailureRecoveryTask(trigger=any_failed)(feed, upstream_tasks=[latest])  # should handle any failure in any of above tasks
    
        
       mapped = feeds.map(
                    all_feeds,
                    target=unmapped("aggregated"),
                )
       mapped.set_upstream([failure_recovery])
    This structure isn't giving me the dag i'm looking for and I was wondering if anyone could give any advice on the most "prefect" way to do this. Some questions I had: • Should I initialize tasks before the flow, as the docs advise or is this structure ok? • Is the if and
    latest=
    logic advisable? Or should I use run and skip option transformations and set a "skipped" state? • How should I specify the aggregation task? Right now, the map task seems to only have a dependency on the last feed's
    failure_recovery_task
    .
    a
    Jeremiah
    2 replies
    Copy to Clipboard
  • r

    Riley Hun

    2 years ago
    Hi everyone - How do I have a Task as a class and inherit from another Parent Class? For example: If I have
    # parent class
    class ABC:
      def __init__(
        self,
        user,
        password
      )
        self.user = user
        self.password = password
    
      def query(self)
        pass
    I want to do something like this
    # task
    class Task_A(Task, ABC):
      def run()
        pass
    Currently, I'm just doing this instead
    @task
    def task_a(user, password, date_col, dataset_id):
        user: str = None,
        password: str = None,
        date_col: str = None,
        dataset_id: str = None
    ):
    
        conn = ABC(user=user, password=password)
        query = f"SELECT DISTINCT {date_col} FROM EDW_DNA_DB.WI.{dataset_id}"
        query_result = conn.query(query)
        return query_result[date_col].tolist()
    r
    Jeremiah
    3 replies
    Copy to Clipboard
  • bral

    bral

    2 years ago
    I have a same issue as @Riley Hun , but with local agent. The Flow has dependency from my classes, and after successfully registering and running - i got error " not module named". And it solved If i placed directory with dependency in my environment (C:\ProgramData\Anaconda3...) For example in airflow there is plugin directory for this case. Does prefect has same option ?
    bral
    Chris White
    5 replies
    Copy to Clipboard
  • q

    Qwame

    2 years ago
    Hello everyone, I have a task that several other tasks depend on in my prefect flow. In airflow I could do something like
    F1 >> [f2, f3, f4, f5, f6]
    What's the best way to set these dependencies in Prefect. I notice that set_downstream doesn't accept a list of tasks. Is there any efficient way to do this in Prefect? Also does the new Prefect UI mean I don't need docker to run it? Thanks
    q
    nicholas
    +1
    10 replies
    Copy to Clipboard