• a

    Adisun Wheelock

    2 years ago
    The
    docker-compose.yml
    (in prefect/cli) that is ran upon
    prefect server start
    brings up a postgres instance. If I already had a postgres instance running, is it possible through the CLI to omit bringing up the postgres instance that is in the
    docker-compose.yml
    ? Similar to
    docker-compose up
    .
    a
    j
    6 replies
    Copy to Clipboard
  • Crawford Collins

    Crawford Collins

    2 years ago
    I've having a little trouble using the SQLiteQuery task. https://docs.prefect.io/api/latest/tasks/sqlite.html#prefect-tasks-database-sqlite-sqlitequerytask I'm trying to pass the db name as a parameter, but it is not working. The query task does not accept parameters, only string. I've attached a concise version of my code and error message below.
    def test_meta_model_regression():
        meta_model_run = meta_model_flow.run(
            db = "test.db",
        )
        assert meta_model_run.message == "All reference tasks succeeded."
    
    with Flow("meta_model_flow") as meta_model_flow:
        db = Parameter("db")
        models = SQLiteQuery(db,"SELECT identifier FROM models")
    
    >> TypeError: expected str, bytes or os.PathLike object, not Parameter
    Is there some way to pass the parameter "test.db" to the query?
    Crawford Collins
    Jenny
    4 replies
    Copy to Clipboard
  • t

    Troy Sankey

    2 years ago
    We're just starting to build out a re-usable library of prefect tasks, but ran into the fact that we can't actually serialize any of our flows because they refer to tasks defined in separate (common) python files. For my testing, this is led me to copy+paste common code into each flow file (very not-DRY), so I'm wondering if there's clearly something I'm missing? We're using the Docker storage environment.
    t
    j
    2 replies
    Copy to Clipboard
  • Christopher Harris

    Christopher Harris

    2 years ago
    Question #2: Here is an example of a flow I’m using: The idea is we pull in a list of “documents” from a single source and we want to push each document to every sink.
    def execute(pipeline_config: PipelineConfiguration):
        project = pipeline_config.project
        with Flow("test-flow") as flow:
            # Service Initialization
            source = init_source(project, pipeline_config.
            sinks = init_sink.map(unmapped(project), pipeline_config.sinks)
    
            # Perform ETL
            documents = pull(source)
            push.map(documents, sinks)
    
        flow.run(executor=LocalExecutor())
    The problem with this approach is it does a one to one mapping - like the first image. I want a many to one mapping, like the second image. Effectively i am trying to recreate the following logic
    for each document:
        for each sink:
            sink.push(doc)
    Christopher Harris
    Jenny
    +1
    8 replies
    Copy to Clipboard
  • Brad

    Brad

    2 years ago
    Hey team, I’m trying to run the docker agent inside of a docker container but having some troubles - has anyone attmped this?
    Brad
    Jenny
    +1
    9 replies
    Copy to Clipboard
  • t

    Tom B

    2 years ago
    I have certain tasks that need to run on a monthly basis and other tasks that need to run on a daily basis. The outputs from the monthly tasks are inputs to the daily tasks. Can these tasks be run in a single flow or do I need to create two different flows with each flow tied to the different interval schedule (monthly versus daily schedule, respectively)? I can't seem to run two different tasks with different time interval requirements to run in a single flow. Can anyone help?
    t
    nicholas
    +1
    10 replies
    Copy to Clipboard
  • j

    Joe Schmid

    2 years ago
    Is there a preferred logging approach from within callbacks? Use case is an
    on_execute()
    (or the existing
    on_start()
    ) called from an Environment. I can pass the environment's
    self.logger
    -- that seems clunky, but the following doesn't seem to work:
    def on_execute(parameters: Dict[str, Any], provider_kwargs: Dict[str, Any]) -> None:
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Checking Flow run parameters: {}".format(parameters))
    j
    Chris White
    5 replies
    Copy to Clipboard
  • Arsenii

    Arsenii

    2 years ago
    Anyone runs into a bug, when there's a a couple tasks mapping over a list with 10-20 elements (or more) in the Cloud, the execution just randomly stops?
    Task 'Some important work[19]': Calling task.run() method...
    and it just "runs" this task forever, but there's no
    changed state from ... to Running
    afterwards When using a LocalExecutor. I thought that this maybe has something to do with me using
    raise SKIP
    to "filter" some elements, but rewriting using a
    FilterTask
    didn't fix the problem
    Arsenii
    nicholas
    +1
    27 replies
    Copy to Clipboard
  • z

    Ziyao Wei

    2 years ago
    Is there an easy way to pass persistent data between flow runs?
    z
    nicholas
    6 replies
    Copy to Clipboard
  • z

    Ziyao Wei

    2 years ago
    Do prefect support Airflow’s
    max_active_runs
    or similar options?
    z
    nicholas
    +1
    4 replies
    Copy to Clipboard