matta
02/18/2021, 7:31 PMfilepaths = get_filepaths(dbt_path, upstream_tasks=[dbt_run])
publish_artifact(filepaths)
matta
02/18/2021, 7:34 PMCraig Wright
02/18/2021, 8:07 PMFivetranSyncTask
, that lets you kick off a sync with Fivetran and will tell you when that sync has completed (or failed). The goal is to let the user be more deterministic about when to start processing or transforming data that has been landed in a data warehouse by Fivetran. For example, even though Fivetran's UI lets you setup a sync and also setup dbt transformations, we currently don't have the functionality to have the dbt transfromation run after a sync has completed. You could do this now with Prefect and the FivetranSyncTask.
This is our very first effort to start bridging the gap between our automated pipelines and the more robust scheduling needs of a Prefect user. It is very early days, we would love feedback! Please reach out to myself and/or @Nick Acosta. We would love to talk to folks who would be interested this, especially to know where this does or does not meet your needs. Thanks!S K
02/18/2021, 11:05 PMCA Lee
02/21/2021, 5:31 AMCA Lee
02/27/2021, 12:14 PMSlackbot
02/27/2021, 7:24 PMSlackbot
03/05/2021, 7:18 PMSlackbot
03/17/2021, 10:18 PMmatta
03/26/2021, 2:44 AMtoolz
library works pretty seamlessly with Prefect maps. So you can use pipe
, compose
, thread_first
or thread_last
to chain operations. https://toolz.readthedocs.io/en/latest/api.html#toolz.functoolz.pipe So like, from the Horizontal Mapping blog post, this:
with Flow("mapping-test") as flow:
sleep.map(sleep.map(times))
cam become this:
with Flow("mapping-test") as flow:
tz.pipe(times,
sleep.map,
sleep.map)
which I find a bit more readable!
Or lets you turn:
with Flow('iterated map') as flow:
mapped_result = add_ten.map([1, 2, 3])
mapped_result_2 = add_ten.map(mapped_result)
into
with Flow('asdasds') as flow:
mapped_result = tz.pipe([1, 2, 3],
add_ten.map,
add_ten.map)
Jacopo Tagliabue
04/02/2021, 4:43 PM<https://github.com/jacopotagliabue/metaflow-as-prefect-task>
Screenshot Overview Here
<https://drive.google.com/file/d/1XoG8UfPpiCSuXYvp9Y27zm6_E4D3C9NX/view>
Open to feedback, especially if you think it's a broad enough use case to deserve a much better polished task class (which can totally be done and I'm happy to work together with others on it!)Aaron Richter
04/12/2021, 12:23 AMAndrew Moist
04/13/2021, 11:18 AMale
04/16/2021, 12:05 PMMaikel Penz
04/27/2021, 9:18 PMflavienbwk
04/29/2021, 7:57 PMDylan
Chris White
Josiah Berkebile
05/21/2021, 3:27 PMNelson Griffiths
05/21/2021, 9:02 PMKevin Kho
Jason Prado
05/31/2021, 10:59 PMmatta
06/25/2021, 9:39 PMgit
Storage to point it to code on a private repo):
# This is a basic workflow to help you get started with Actions
name: CI
# Controls when the action will run.
on:
# Triggers the workflow on push or pull request events but only for the main branch
push:
branches: [ main ]
pull_request:
branches: [ main ]
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
# This workflow contains a single job called "build"
build:
# The type of runner that the job will run on
runs-on: ubuntu-latest
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
- uses: actions/checkout@v2
- uses: conda-incubator/setup-miniconda@v2
with:
auto-update-conda: true
python-version: 3.8
- name: install prefect
shell: bash -l {0}
run: conda install -c conda-forge prefect -y
- name: login to Prefect Cloud
shell: bash -l {0}
run: prefect auth login -t ${{secrets.PREFECT_TOKEN}}
- name: Register flow
shell: bash -l {0}
run: prefect register --project tester -p test.py -n "run-cloud-fn"
matta
06/28/2021, 11:57 PMSecret
in Prefect)
import prefect
from prefect import task, Flow, Parameter
from prefect.tasks.secrets.base import PrefectSecret
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession
@task
def trigger_cloud_fn(
secret: PrefectSecret, url: str, body: str
):
credentials = service_account.IDTokenCredentials.from_service_account_info(
secret, target_audience=url
)
authed_session = AuthorizedSession(credentials)
response = <http://authed_session.post|authed_session.post>(url=url, json=body)
return response
Kevin Kho
Kevin Kho
matta
08/03/2021, 4:21 AM@task(max_retries=3, retry_delay=datetime.timedelta(seconds=30))
def trigger_cloud_fn(secret: PrefectSecret, url: str, body: t.Dict = dict()):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(body)
credentials = service_account.IDTokenCredentials.from_service_account_info(
secret, target_audience=url
)
authed_session = AuthorizedSession(credentials)
response = <http://authed_session.post|authed_session.post>(url=url, json=body)
if not (isinstance(response, requests.models.Response) and response.ok):
raise signals.FAIL()
authed_session.close()
return response
Gleb Mezhanskiy (Datafold)
08/19/2021, 4:17 PMGareth Dwyer
08/20/2021, 10:56 AM